You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Maier, Dr. Andreas" <an...@asideas.de> on 2013/09/06 10:41:40 UTC
Java junit test for a Kafka producer returns "failed to collate
messages by topic"
Hello,
I wrote a simple junit test to test a Kafka producer.
public class KafkaProducerTest {
private int brokerId = 0;
private String topic = "test";
@Test
public void producerTest() throws InterruptedException {
// setup Zookeeper
String zkConnect = TestZKUtils.zookeeperConnect();
EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
ZkClient zkClient = new ZkClient(zkServer.connectString());
// setup Broker
int port = TestUtils.choosePort();
Properties props = TestUtils.createBrokerConfig(brokerId, port);
KafkaConfig config = new KafkaConfig(props);
Time mock = new MockTime();
KafkaServer kafkaServer = TestUtils.createServer(config, mock);
// create topic
AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());
// setup producer
Properties properties = TestUtils.getProducerConfig("localhost:"
+ port, "kafka.producer.DefaultPartitioner");
ProducerConfig pConfig = new ProducerConfig(properties);
Producer producer = new Producer(pConfig);
// send message
KeyedMessage<Integer, String> data = new KeyedMessage(topic,
"test-message");
List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
messages.add(data);
producer.send(scala.collection.JavaConversions.asBuffer(messages));
// cleanup
producer.close();
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}
}
However when I run the test I get the following error messages
[2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test
(kafka.producer.async.DefaultEventHandler:97)
[2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test
(kafka.producer.async.DefaultEventHandler:97)
[2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test
(kafka.producer.async.DefaultEventHandler:97)
[2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test
(kafka.producer.async.DefaultEventHandler:97)
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:9
0)
at kafka.producer.Producer.send(Producer.scala:74)
at
de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTest.
java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
7)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp
l.java:43)
at
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod
.java:47)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.
java:12)
at
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.j
ava:44)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.ja
va:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja
va:70)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja
va:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
at
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTest
Runner.java:77)
at
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitSt
arter.java:195)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
7)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
[2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test
with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler:97)
I tried to write my unit test following the scala unit tests in the kafka
core.
But it seems like I'm still missing something basic to make it work.
Can someone help me with that? I'm developing on Mac OS X 10.8.3, and
compiled the latest Kafka
(plus the TestUtils) from the git repository using Scala 2.9.2.
Regards,
Andreas Maier
Re: Java junit test for a Kafka producer returns "failed to collate
messages by topic"
Posted by "Maier, Dr. Andreas" <an...@asideas.de>.
I tried that, but I still got
junit.framework.AssertionFailedError: Partition [test,0] metadata not
propagated after timeout
at
kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:506)
at kafka.utils.TestUtils.waitUntilMetadataIsPropagated(TestUtils.scala)
...
However it seems I found a solution for my problem. I had to replace
ZkClient zkClient = new ZkClient(zkServer.connectString());
with
ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000,
ZKStringSerializer$.MODULE$);
Then suddenly my test ran without problems.
It seem that without using the ZKStringSerializer the ZkClient cannot be
used.
Regards,
Andreas Maier
Am 06.09.13 13:52 schrieb "Joe Stein" unter <cr...@gmail.com>:
>The topic maybe is not created at the broker yet ... take a look at
>ProducerTest.scala as example
>
>You could try TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0,
>500) to assert that the topic is in fact at the broker before sending
>after
>creating it
>
>/*******************************************
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>********************************************/
>
>
>On Fri, Sep 6, 2013 at 4:41 AM, Maier, Dr. Andreas
><andreas.maier@asideas.de
>> wrote:
>
>> Hello,
>>
>> I wrote a simple junit test to test a Kafka producer.
>>
>> public class KafkaProducerTest {
>>
>> private int brokerId = 0;
>> private String topic = "test";
>>
>> @Test
>> public void producerTest() throws InterruptedException {
>>
>> // setup Zookeeper
>> String zkConnect = TestZKUtils.zookeeperConnect();
>> EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
>> ZkClient zkClient = new ZkClient(zkServer.connectString());
>>
>> // setup Broker
>> int port = TestUtils.choosePort();
>> Properties props = TestUtils.createBrokerConfig(brokerId, port);
>>
>> KafkaConfig config = new KafkaConfig(props);
>> Time mock = new MockTime();
>> KafkaServer kafkaServer = TestUtils.createServer(config, mock);
>>
>> // create topic
>> AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());
>>
>> // setup producer
>> Properties properties =
>>TestUtils.getProducerConfig("localhost:"
>> + port, "kafka.producer.DefaultPartitioner");
>>
>> ProducerConfig pConfig = new ProducerConfig(properties);
>> Producer producer = new Producer(pConfig);
>>
>> // send message
>> KeyedMessage<Integer, String> data = new KeyedMessage(topic,
>> "test-message");
>>
>> List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
>> messages.add(data);
>>
>>
>>producer.send(scala.collection.JavaConversions.asBuffer(messages));
>>
>> // cleanup
>> producer.close();
>> kafkaServer.shutdown();
>> zkClient.close();
>> zkServer.shutdown();
>> }
>>
>> }
>>
>>
>> However when I run the test I get the following error messages
>>
>> [2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>> [2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>> [2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>> [2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>>
>> kafka.common.FailedToSendMessageException: Failed to send messages
>>after 3
>> tries.
>> at
>>
>>kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>:9
>> 0)
>> at kafka.producer.Producer.send(Producer.scala:74)
>> at
>>
>>de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTes
>>t.
>> java:57)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java
>>:5
>> 7)
>> at
>>
>>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorI
>>mp
>> l.java:43)
>> at
>>
>>org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMeth
>>od
>> .java:47)
>> at
>>
>>org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallabl
>>e.
>> java:12)
>> at
>>
>>org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod
>>.j
>> ava:44)
>> at
>>
>>org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.
>>ja
>> va:17)
>> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>> at
>>
>>org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.
>>ja
>> va:70)
>> at
>>
>>org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.
>>ja
>> va:50)
>> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>> at
>>org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>> at
>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>> at
>>org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>> at
>>org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>> at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>> at
>>
>>com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTe
>>st
>> Runner.java:77)
>> at
>>
>>com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnit
>>St
>> arter.java:195)
>> at
>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java
>>:5
>> 7)
>> at
>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
>>
>> [2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test
>> with correlation ids in [0,8]
>>(kafka.producer.async.DefaultEventHandler:97)
>>
>> I tried to write my unit test following the scala unit tests in the
>>kafka
>> core.
>> But it seems like I'm still missing something basic to make it work.
>> Can someone help me with that? I'm developing on Mac OS X 10.8.3, and
>> compiled the latest Kafka
>> (plus the TestUtils) from the git repository using Scala 2.9.2.
>>
>>
>> Regards,
>>
>> Andreas Maier
>>
>>
>>
>>
Re: Java junit test for a Kafka producer returns "failed to collate
messages by topic"
Posted by Joe Stein <cr...@gmail.com>.
The topic maybe is not created at the broker yet ... take a look at
ProducerTest.scala as example
You could try TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0,
500) to assert that the topic is in fact at the broker before sending after
creating it
/*******************************************
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/
On Fri, Sep 6, 2013 at 4:41 AM, Maier, Dr. Andreas <andreas.maier@asideas.de
> wrote:
> Hello,
>
> I wrote a simple junit test to test a Kafka producer.
>
> public class KafkaProducerTest {
>
> private int brokerId = 0;
> private String topic = "test";
>
> @Test
> public void producerTest() throws InterruptedException {
>
> // setup Zookeeper
> String zkConnect = TestZKUtils.zookeeperConnect();
> EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
> ZkClient zkClient = new ZkClient(zkServer.connectString());
>
> // setup Broker
> int port = TestUtils.choosePort();
> Properties props = TestUtils.createBrokerConfig(brokerId, port);
>
> KafkaConfig config = new KafkaConfig(props);
> Time mock = new MockTime();
> KafkaServer kafkaServer = TestUtils.createServer(config, mock);
>
> // create topic
> AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());
>
> // setup producer
> Properties properties = TestUtils.getProducerConfig("localhost:"
> + port, "kafka.producer.DefaultPartitioner");
>
> ProducerConfig pConfig = new ProducerConfig(properties);
> Producer producer = new Producer(pConfig);
>
> // send message
> KeyedMessage<Integer, String> data = new KeyedMessage(topic,
> "test-message");
>
> List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
> messages.add(data);
>
> producer.send(scala.collection.JavaConversions.asBuffer(messages));
>
> // cleanup
> producer.close();
> kafkaServer.shutdown();
> zkClient.close();
> zkServer.shutdown();
> }
>
> }
>
>
> However when I run the test I get the following error messages
>
> [2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
>
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:9
> 0)
> at kafka.producer.Producer.send(Producer.scala:74)
> at
> de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTest.
> java:57)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
> 7)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp
> l.java:43)
> at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod
> .java:47)
> at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.
> java:12)
> at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.j
> ava:44)
> at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.ja
> va:17)
> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja
> va:70)
> at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja
> va:50)
> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
> at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
> at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
> at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
> at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTest
> Runner.java:77)
> at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitSt
> arter.java:195)
> at
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
> 7)
> at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
>
> [2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test
> with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler:97)
>
> I tried to write my unit test following the scala unit tests in the kafka
> core.
> But it seems like I'm still missing something basic to make it work.
> Can someone help me with that? I'm developing on Mac OS X 10.8.3, and
> compiled the latest Kafka
> (plus the TestUtils) from the git repository using Scala 2.9.2.
>
>
> Regards,
>
> Andreas Maier
>
>
>
>