You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mike Thomsen <mi...@gmail.com> on 2016/02/05 16:26:42 UTC

Running Kafka in unit tests

This is how I set up my JUnit test to get kafka and zookeeper running
during the duration of the test:

    static {
        embeddedZKServer = new TestingServer();
        embeddedKafkaServerPort = TestUtils.RandomPort();

        Properties brokerProperties = TestUtils.createBrokerConfig(1,
embeddedZKServer.getConnectString(), true,
                false, TestUtils.RandomPort(), scala.None$.MODULE$,
scala.None$.MODULE$, true, false, TestUtils.RandomPort(), false,
TestUtils.RandomPort(),
                false, TestUtils.RandomPort());
        brokerProperties.setProperty("zookeeper.connect",
embeddedZKServer.getConnectString());
        embeddedKafkaServer = TestUtils.createServer(new
KafkaConfig(brokerProperties), new MockTime());
        KAFKA_PORT =
embeddedKafkaServer.socketServer().boundPort(SecurityProtocol.PLAINTEXT)
        println "${"\n"*10}Kafka is running on port ${KAFKA_PORT}"
    }

This is the unit test:

@Test
public void testPost() {
    String doc = System.getResourceAsStream("/document.xml").text
    def entity = new Entity<String>(doc, MediaType.APPLICATION_XML_TYPE)
    Properties props = new Properties();
    props.put("bootstrap.servers", ["localhost:$KAFKA_PORT".toString()]);
    props.put("group.id", "test");
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
    ["topic1", "topic2"].each { topic ->
        Thread.start {
            KafkaConsumer<String, String> consumer = new
KafkaConsumer<String, String>(props);
            consumer.subscribe(Arrays.asList(topic));
            def done = false
            while (!done) {
                ConsumerRecords<String, String> records =
consumer.poll(100);
                Thread.sleep(100)
                if (records.count() > 5) {
                    done = true
                    println "Done!"
                    println records.iterator().next().value()
                }
            }
        }
    }


    0.upto(25) {
        /* operation that posts to kafka */
    }
    Thread.sleep(120000)
}


Oddly enough, things were running fine yesterday. Now I get the following
errors which show that the threaded code never completes:

org.apache.kafka.clients.NetworkClient: Error while fetching metadata with
correlation id 1 : {topic1=LEADER_NOT_AVAILABLE}
org.apache.kafka.clients.NetworkClient: Error while fetching metadata with
correlation id 1 : {topic2=LEADER_NOT_AVAILABLE}
org.apache.kafka.clients.NetworkClient: Error while fetching metadata with
correlation id 3 : {topic2=LEADER_NOT_AVAILABLE}
org.apache.kafka.clients.NetworkClient: Error while fetching metadata with
correlation id 3 : {topic1=LEADER_NOT_AVAILABLE}
org.apache.kafka.clients.NetworkClient: Error while fetching metadata with
correlation id 5 : {topic2=LEADER_NOT_AVAILABLE}
org.apache.kafka.clients.NetworkClient: Error while fetching metadata with
correlation id 5 : {topic1=LEADER_NOT_AVAILABLE}

Any ideas? Are there any samples that I can use to better work on this?

Thanks,

Mike