You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Rodrigo Ottero <ot...@gmail.com> on 2016/06/17 16:33:20 UTC

Messages delayed in jUnit test (version 0.9.0.0)

Hi.

I am trying to use an embedded Kafka server to allow me to create tests in
jUnit using a real Kafka implementation, instead of a stub or a mock.

I am using Kafka version 0.9.0.0.

The test works, but the consumer poll has to wait for 3 seconds to get the
message.

Here is the code I am running:

----------
        // given a Kafka producer, a kafka consumer, a topic and a message
        final Producer<String, String> producer = createKafkaProducer();
        final KafkaConsumer<String, String> consumer =
createKafkaConsumer(topicName);

        // when the producer publishes the message to the topic
        producer.send(new ProducerRecord<String, String>(topicName,
message));

        // then the consumer can read it from the topic
        final ConsumerRecords<String, String> records = consumer.poll(3000);
        assertTrue("The topic should have only one message. The number of
messages found is: " + records.count(),
​​
records.count() == 1);
        consumer.close();
----------

For this to work, consumer.poll(...) must be 3 seconds to get the message,
otherwise the consumer will get an empty ConsumerRecords (
​
records.count() == 0).

I tried sleeping the execution for 10 seconds before reaching the consumer,
to see if it was some delay caused by the server startup, but it did not
change the poll's time.

However, if I use the kafka-console-consumer.bat present in Kafka
installation and point it to my embedded server, it reads it almost
immediately.

Here is the configuration used in the test's consumer:

----------
        final Properties props = new Properties();
        props.put("bootstrap.servers", kafkaServerAndPort);
        props.put("group.id", "anyGroupId");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        final KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));
----------

Would anyone have some idea why it is taking 3 seconds for the poll to grab
the message?

Thanks in advance!

PS: here is part of the code used to startup the Kafka and Zookeeper
servers:

-----------
    public EmbeddedKafkaAndZookeeperServers() throws Exception {
        removeExistingLogFiles();
        startZookeeperServer();
        startKafkaServer();
    }

    private void startKafkaServer() throws IOException {
        final KafkaConfig config =
getKafkaConfig(zookeeperServer.getConnectString());
        kafkaServer = new KafkaServerStartable(config);
        kafkaServer.startup();
    }

    private void startZookeeperServer() throws Exception {
        final int shouldUseARandomPort = -1;
        final File temporaryDirectory = new File(ZOOKEEPER_LOGS_DIRECTORY);
        final boolean shouldStartImmediately = true;
        zookeeperServer = new TestingServer(shouldUseARandomPort,
temporaryDirectory, shouldStartImmediately);
    }

    private static KafkaConfig getKafkaConfig(final String
zookeeperConnectString) throws IOException {
        final Properties props = new Properties();
        props.put("broker.id", BROKER_ID);
        props.put("port", BROKER_PORT);
        props.put("log.dir", KAFKA_LOGS_DIRECTORY);
        props.put("zookeeper.connect", zookeeperConnectString);
        props.put("host.name", "127.0.0.1");
        props.put("auto.create.topics.enable", "true");
        return new KafkaConfig(props);
    }

-----------

Re: Messages delayed in jUnit test (version 0.9.0.0)

Posted by Rodrigo Ottero <ot...@gmail.com>.
Hello.

I still could not progress in this issue.

As per Jay Kreps recent email in thread 'delay of producer and consumer in
kafka 0.9 is too big to be accepted', I will do the *TestEndToEndLatency *test
to see my latency.

But, besides that, am I doing something wrong in the code below?

Thanks & regards,

Rodrigo

​​
---------- Forwarded message ----------
From: Rodrigo Ottero <ot...@gmail.com>
Date: Fri, Jun 17, 2016 at 5:33 PM
Subject: Messages delayed in jUnit test (version 0.9.0.0)
To: users@kafka.apache.org


Hi.

I am trying to use an embedded Kafka server to allow me to create tests in
jUnit using a real Kafka implementation, instead of a stub or a mock.

I am using Kafka version 0.9.0.0.

The test works, but the consumer poll has to wait for 3 seconds to get the
message.

Here is the code I am running:

----------
        // given a Kafka producer, a kafka consumer, a topic and a message
        final Producer<String, String> producer = createKafkaProducer();
        final KafkaConsumer<String, String> consumer =
createKafkaConsumer(topicName);

        // when the producer publishes the message to the topic
        producer.send(new ProducerRecord<String, String>(topicName,
message));

        // then the consumer can read it from the topic
        final ConsumerRecords<String, String> records = consumer.poll(3000);
        assertTrue("The topic should have only one message. The number of
messages found is: " + records.count(),
​​
records.count() == 1);
        consumer.close();
----------

For this to work, consumer.poll(...) must be 3 seconds to get the message,
otherwise the consumer will get an empty ConsumerRecords (
​
records.count() == 0).

I tried sleeping the execution for 10 seconds before reaching the consumer,
to see if it was some delay caused by the server startup, but it did not
change the poll's time.

However, if I use the kafka-console-consumer.bat present in Kafka
installation and point it to my embedded server, it reads it almost
immediately.

Here is the configuration used in the test's consumer:

----------
        final Properties props = new Properties();
        props.put("bootstrap.servers", kafkaServerAndPort);
        props.put("group.id", "anyGroupId");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        final KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topicName));
----------

Would anyone have some idea why it is taking 3 seconds for the poll to grab
the message?

Thanks in advance!

PS: here is part of the code used to startup the Kafka and Zookeeper
servers:

-----------
    public EmbeddedKafkaAndZookeeperServers() throws Exception {
        removeExistingLogFiles();
        startZookeeperServer();
        startKafkaServer();
    }

    private void startKafkaServer() throws IOException {
        final KafkaConfig config =
getKafkaConfig(zookeeperServer.getConnectString());
        kafkaServer = new KafkaServerStartable(config);
        kafkaServer.startup();
    }

    private void startZookeeperServer() throws Exception {
        final int shouldUseARandomPort = -1;
        final File temporaryDirectory = new File(ZOOKEEPER_LOGS_DIRECTORY);
        final boolean shouldStartImmediately = true;
        zookeeperServer = new TestingServer(shouldUseARandomPort,
temporaryDirectory, shouldStartImmediately);
    }

    private static KafkaConfig getKafkaConfig(final String
zookeeperConnectString) throws IOException {
        final Properties props = new Properties();
        props.put("broker.id", BROKER_ID);
        props.put("port", BROKER_PORT);
        props.put("log.dir", KAFKA_LOGS_DIRECTORY);
        props.put("zookeeper.connect", zookeeperConnectString);
        props.put("host.name", "127.0.0.1");
        props.put("auto.create.topics.enable", "true");
        return new KafkaConfig(props);
    }

-----------