You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mike Thomsen <mi...@gmail.com> on 2016/02/05 16:26:42 UTC
Running Kafka in unit tests
This is how I set up my JUnit test to get kafka and zookeeper running
during the duration of the test:
static {
embeddedZKServer = new TestingServer();
embeddedKafkaServerPort = TestUtils.RandomPort();
Properties brokerProperties = TestUtils.createBrokerConfig(1,
embeddedZKServer.getConnectString(), true,
false, TestUtils.RandomPort(), scala.None$.MODULE$,
scala.None$.MODULE$, true, false, TestUtils.RandomPort(), false,
TestUtils.RandomPort(),
false, TestUtils.RandomPort());
brokerProperties.setProperty("zookeeper.connect",
embeddedZKServer.getConnectString());
embeddedKafkaServer = TestUtils.createServer(new
KafkaConfig(brokerProperties), new MockTime());
KAFKA_PORT =
embeddedKafkaServer.socketServer().boundPort(SecurityProtocol.PLAINTEXT)
println "${"\n"*10}Kafka is running on port ${KAFKA_PORT}"
}
This is the unit test:
@Test
public void testPost() {
String doc = System.getResourceAsStream("/document.xml").text
def entity = new Entity<String>(doc, MediaType.APPLICATION_XML_TYPE)
Properties props = new Properties();
props.put("bootstrap.servers", ["localhost:$KAFKA_PORT".toString()]);
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
["topic1", "topic2"].each { topic ->
Thread.start {
KafkaConsumer<String, String> consumer = new
KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic));
def done = false
while (!done) {
ConsumerRecords<String, String> records =
consumer.poll(100);
Thread.sleep(100)
if (records.count() > 5) {
done = true
println "Done!"
println records.iterator().next().value()
}
}
}
}
0.upto(25) {
/* operation that posts to kafka */
}
Thread.sleep(120000)
}
Oddly enough, things were running fine yesterday. Now I get the following
errors which show that the threaded code never completes:
org.apache.kafka.clients.NetworkClient: Error while fetching metadata with
correlation id 1 : {topic1=LEADER_NOT_AVAILABLE}
org.apache.kafka.clients.NetworkClient: Error while fetching metadata with
correlation id 1 : {topic2=LEADER_NOT_AVAILABLE}
org.apache.kafka.clients.NetworkClient: Error while fetching metadata with
correlation id 3 : {topic2=LEADER_NOT_AVAILABLE}
org.apache.kafka.clients.NetworkClient: Error while fetching metadata with
correlation id 3 : {topic1=LEADER_NOT_AVAILABLE}
org.apache.kafka.clients.NetworkClient: Error while fetching metadata with
correlation id 5 : {topic2=LEADER_NOT_AVAILABLE}
org.apache.kafka.clients.NetworkClient: Error while fetching metadata with
correlation id 5 : {topic1=LEADER_NOT_AVAILABLE}
Any ideas? Are there any samples that I can use to better work on this?
Thanks,
Mike