You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Maier, Dr. Andreas" <an...@asideas.de> on 2013/09/06 10:41:40 UTC

Java junit test for a Kafka producer returns "failed to collate messages by topic"

Hello,

I wrote a simple junit test to test a Kafka producer.

public class KafkaProducerTest {

    private int brokerId = 0;
    private String topic = "test";

    @Test
    public void producerTest() throws InterruptedException {

        // setup Zookeeper
        String zkConnect = TestZKUtils.zookeeperConnect();
        EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
        ZkClient zkClient = new ZkClient(zkServer.connectString());

        // setup Broker
        int port = TestUtils.choosePort();
        Properties props = TestUtils.createBrokerConfig(brokerId, port);

        KafkaConfig config = new KafkaConfig(props);
        Time mock = new MockTime();
        KafkaServer kafkaServer = TestUtils.createServer(config, mock);

        // create topic
       AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());

        // setup producer
        Properties properties =  TestUtils.getProducerConfig("localhost:"
+ port, "kafka.producer.DefaultPartitioner");

        ProducerConfig pConfig = new ProducerConfig(properties);
        Producer producer = new Producer(pConfig);

        // send message
        KeyedMessage<Integer, String> data = new KeyedMessage(topic,
"test-message");

        List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
        messages.add(data);

        producer.send(scala.collection.JavaConversions.asBuffer(messages));

        // cleanup
        producer.close();
        kafkaServer.shutdown();
        zkClient.close();
        zkServer.shutdown();
    }

}


However when I run the test I get the following error messages

[2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test
(kafka.producer.async.DefaultEventHandler:97)
[2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test
(kafka.producer.async.DefaultEventHandler:97)
[2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test
(kafka.producer.async.DefaultEventHandler:97)
[2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: test
(kafka.producer.async.DefaultEventHandler:97)

kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
	at 
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:9
0)
	at kafka.producer.Producer.send(Producer.scala:74)
	at 
de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTest.
java:57)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
7)
	at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp
l.java:43)
	at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod
.java:47)
	at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.
java:12)
	at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.j
ava:44)
	at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.ja
va:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
	at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja
va:70)
	at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja
va:50)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
	at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTest
Runner.java:77)
	at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitSt
arter.java:195)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
7)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

[2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test
with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler:97)

I tried to write my unit test following the scala unit tests in the kafka
core. 
But it seems like I'm still missing something basic to make it work.
Can someone help me with that? I'm developing on Mac OS X 10.8.3, and
compiled the latest Kafka
(plus the TestUtils) from the git repository using Scala 2.9.2.


Regards,

Andreas Maier




Re: Java junit test for a Kafka producer returns "failed to collate messages by topic"

Posted by "Maier, Dr. Andreas" <an...@asideas.de>.
I tried that, but I still got

junit.framework.AssertionFailedError: Partition [test,0] metadata not
propagated after timeout
	at 
kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:506)
	at kafka.utils.TestUtils.waitUntilMetadataIsPropagated(TestUtils.scala)
...

However it seems I found a solution for my problem. I had to replace

ZkClient zkClient = new ZkClient(zkServer.connectString());


with

ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000,
ZKStringSerializer$.MODULE$);

Then suddenly my test ran without problems.
It seem that without using the ZKStringSerializer the ZkClient cannot be
used. 

Regards,

Andreas Maier



Am 06.09.13 13:52 schrieb "Joe Stein" unter <cr...@gmail.com>:

>The topic maybe is not created at the broker yet ... take a look at
>ProducerTest.scala as example
>
>You could try TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0,
>500) to assert that the topic is in fact at the broker before sending
>after
>creating it
>
>/*******************************************
> Joe Stein
> Founder, Principal Consultant
> Big Data Open Source Security LLC
> http://www.stealth.ly
> Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
>********************************************/
>
>
>On Fri, Sep 6, 2013 at 4:41 AM, Maier, Dr. Andreas
><andreas.maier@asideas.de
>> wrote:
>
>> Hello,
>>
>> I wrote a simple junit test to test a Kafka producer.
>>
>> public class KafkaProducerTest {
>>
>>     private int brokerId = 0;
>>     private String topic = "test";
>>
>>     @Test
>>     public void producerTest() throws InterruptedException {
>>
>>         // setup Zookeeper
>>         String zkConnect = TestZKUtils.zookeeperConnect();
>>         EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
>>         ZkClient zkClient = new ZkClient(zkServer.connectString());
>>
>>         // setup Broker
>>         int port = TestUtils.choosePort();
>>         Properties props = TestUtils.createBrokerConfig(brokerId, port);
>>
>>         KafkaConfig config = new KafkaConfig(props);
>>         Time mock = new MockTime();
>>         KafkaServer kafkaServer = TestUtils.createServer(config, mock);
>>
>>         // create topic
>>        AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());
>>
>>         // setup producer
>>         Properties properties =
>>TestUtils.getProducerConfig("localhost:"
>> + port, "kafka.producer.DefaultPartitioner");
>>
>>         ProducerConfig pConfig = new ProducerConfig(properties);
>>         Producer producer = new Producer(pConfig);
>>
>>         // send message
>>         KeyedMessage<Integer, String> data = new KeyedMessage(topic,
>> "test-message");
>>
>>         List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
>>         messages.add(data);
>>
>>         
>>producer.send(scala.collection.JavaConversions.asBuffer(messages));
>>
>>         // cleanup
>>         producer.close();
>>         kafkaServer.shutdown();
>>         zkClient.close();
>>         zkServer.shutdown();
>>     }
>>
>> }
>>
>>
>> However when I run the test I get the following error messages
>>
>> [2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>> [2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>> [2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>> [2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic,
>> partition due to: Failed to fetch topic metadata for topic: test
>> (kafka.producer.async.DefaultEventHandler:97)
>>
>> kafka.common.FailedToSendMessageException: Failed to send messages
>>after 3
>> tries.
>>         at
>> 
>>kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala
>>:9
>> 0)
>>         at kafka.producer.Producer.send(Producer.scala:74)
>>         at
>> 
>>de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTes
>>t.
>> java:57)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> 
>>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java
>>:5
>> 7)
>>         at
>> 
>>sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorI
>>mp
>> l.java:43)
>>         at
>> 
>>org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMeth
>>od
>> .java:47)
>>         at
>> 
>>org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallabl
>>e.
>> java:12)
>>         at
>> 
>>org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod
>>.j
>> ava:44)
>>         at
>> 
>>org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.
>>ja
>> va:17)
>>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>>         at
>> 
>>org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.
>>ja
>> va:70)
>>         at
>> 
>>org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.
>>ja
>> va:50)
>>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>>         at 
>>org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>>         at
>> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>>         at 
>>org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>>         at 
>>org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>>         at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>>         at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>>         at
>> 
>>com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTe
>>st
>> Runner.java:77)
>>         at
>> 
>>com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnit
>>St
>> arter.java:195)
>>         at
>> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> 
>>sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java
>>:5
>> 7)
>>         at
>> com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
>>
>> [2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test
>> with correlation ids in [0,8]
>>(kafka.producer.async.DefaultEventHandler:97)
>>
>> I tried to write my unit test following the scala unit tests in the
>>kafka
>> core.
>> But it seems like I'm still missing something basic to make it work.
>> Can someone help me with that? I'm developing on Mac OS X 10.8.3, and
>> compiled the latest Kafka
>> (plus the TestUtils) from the git repository using Scala 2.9.2.
>>
>>
>> Regards,
>>
>> Andreas Maier
>>
>>
>>
>>


Re: Java junit test for a Kafka producer returns "failed to collate messages by topic"

Posted by Joe Stein <cr...@gmail.com>.
The topic maybe is not created at the broker yet ... take a look at
ProducerTest.scala as example

You could try TestUtils.waitUntilMetadataIsPropagated(servers, topic, 0,
500) to assert that the topic is in fact at the broker before sending after
creating it

/*******************************************
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
********************************************/


On Fri, Sep 6, 2013 at 4:41 AM, Maier, Dr. Andreas <andreas.maier@asideas.de
> wrote:

> Hello,
>
> I wrote a simple junit test to test a Kafka producer.
>
> public class KafkaProducerTest {
>
>     private int brokerId = 0;
>     private String topic = "test";
>
>     @Test
>     public void producerTest() throws InterruptedException {
>
>         // setup Zookeeper
>         String zkConnect = TestZKUtils.zookeeperConnect();
>         EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
>         ZkClient zkClient = new ZkClient(zkServer.connectString());
>
>         // setup Broker
>         int port = TestUtils.choosePort();
>         Properties props = TestUtils.createBrokerConfig(brokerId, port);
>
>         KafkaConfig config = new KafkaConfig(props);
>         Time mock = new MockTime();
>         KafkaServer kafkaServer = TestUtils.createServer(config, mock);
>
>         // create topic
>        AdminUtils.createTopic(zkClient, topic, 1, 1, new Properties());
>
>         // setup producer
>         Properties properties =  TestUtils.getProducerConfig("localhost:"
> + port, "kafka.producer.DefaultPartitioner");
>
>         ProducerConfig pConfig = new ProducerConfig(properties);
>         Producer producer = new Producer(pConfig);
>
>         // send message
>         KeyedMessage<Integer, String> data = new KeyedMessage(topic,
> "test-message");
>
>         List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
>         messages.add(data);
>
>         producer.send(scala.collection.JavaConversions.asBuffer(messages));
>
>         // cleanup
>         producer.close();
>         kafkaServer.shutdown();
>         zkClient.close();
>         zkServer.shutdown();
>     }
>
> }
>
>
> However when I run the test I get the following error messages
>
> [2013-09-06 10:23:04,970] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:05,988] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:06,998] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
> [2013-09-06 10:23:08,009] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: test
> (kafka.producer.async.DefaultEventHandler:97)
>
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:9
> 0)
>         at kafka.producer.Producer.send(Producer.scala:74)
>         at
> de.ideas.fingerpost.kafka.KafkaProducerTest.producerTest(KafkaProducerTest.
> java:57)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
> 7)
>         at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImp
> l.java:43)
>         at
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod
> .java:47)
>         at
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.
> java:12)
>         at
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.j
> ava:44)
>         at
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.ja
> va:17)
>         at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
>         at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja
> va:70)
>         at
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.ja
> va:50)
>         at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
>         at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
>         at
> org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
>         at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
>         at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
>         at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
>         at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
>         at
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTest
> Runner.java:77)
>         at
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitSt
> arter.java:195)
>         at
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:63)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:5
> 7)
>         at
> com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
>
> [2013-09-06 10:23:09,017] ERROR Failed to send requests for topics test
> with correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler:97)
>
> I tried to write my unit test following the scala unit tests in the kafka
> core.
> But it seems like I'm still missing something basic to make it work.
> Can someone help me with that? I'm developing on Mac OS X 10.8.3, and
> compiled the latest Kafka
> (plus the TestUtils) from the git repository using Scala 2.9.2.
>
>
> Regards,
>
> Andreas Maier
>
>
>
>