You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Sam Meder (JIRA)" <ji...@apache.org> on 2013/06/18 10:09:20 UTC

[jira] [Commented] (KAFKA-945) Problem with test to send a message and then consume it

    [ https://issues.apache.org/jira/browse/KAFKA-945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13686490#comment-13686490 ] 

Sam Meder commented on KAFKA-945:
---------------------------------

We had a similar issue and it was due to a change in consumer initialization. You need to add 

cProps.setProperty("auto.offset.reset", OffsetRequest.SmallestTimeString());

when initializing your consumer.
                
> Problem with test to send a message and then consume it
> -------------------------------------------------------
>
>                 Key: KAFKA-945
>                 URL: https://issues.apache.org/jira/browse/KAFKA-945
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Jason Rosenberg
>         Attachments: kafak-945.out, kafka-945.tar.gz
>
>
> A simple test, which sends on message synchronously, and then consumes it, is failing, with the latest 0.8 beta release candidate (produced from sha: 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4
> Note this problem did not occur with a previous version of the 0.8 branch (e.g. it seems to work fine for sha:  988d4d8e65a14390abd748318a64e281e4a37c19).
> Essentially, it appears that the message never gets sent (after complaining about missing partition leader, etc.).
> To reproduce, run the sample zookeeper and kafka scripts, that come with the distribution (but first delete all pre-existing state by removing the data directories that zookeeper and kafka use:
> rm -rf /tmp/zookeeper
> rm -rf/tmp/kafka_logs
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> ./bin/kafka-server-start.sh config/server.properties
> Then execute the simple test code (I will attach a tarball which you should be able to unpack and run this example).
>   @Test public void produceAndConsumeMessage() throws Exception {
>     // assumes zookeeper and kafka server are running.
>     String zkConnect = "localhost:2181";
>     int port = 9092;
>     Properties pProps = new Properties();
>     pProps.put("metadata.broker.list", "localhost:" + port);
>     pProps.put("serializer.class", "kafka.serializer.StringEncoder");
>     ProducerConfig pConfig = new ProducerConfig(pProps);
>     Producer<Integer, String> producer = new Producer<Integer, String>(pConfig);
>     KeyedMessage<Integer, String> data =
>         new KeyedMessage<Integer, String>("test-topic", "test-message");
>     producer.send(data);
>     producer.close();
>     Properties cProps = new Properties();
>     cProps.put("zookeeper.connect", zkConnect);
>     cProps.put("group.id", "group1");
>     ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
>     ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
>     Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
>         consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", 1));
>     List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test-topic");
>     final KafkaStream<byte[], byte[]> stream = streams.get(0);
>     final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
>     // run in a separate thread
>     final AtomicBoolean success = new AtomicBoolean(false);
>     Thread consumeThread = new Thread(new Runnable() {
>       public void run() {
>         while (iter.hasNext()) {
>           byte[] msg = iter.next().message();
>           String msgStr = new String(msg);
>           success.set(msgStr.equals("test-message"));
>           break;
>         }
>       }
>     });
>     consumeThread.start();
>     consumeThread.join();
>     consumerConnector.shutdown();
>     assertTrue(success.get());
>   }

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira