You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Sam Meder (JIRA)" <ji...@apache.org> on 2013/06/18 10:09:20 UTC
[jira] [Commented] (KAFKA-945) Problem with test to send a message
and then consume it
[ https://issues.apache.org/jira/browse/KAFKA-945?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13686490#comment-13686490 ]
Sam Meder commented on KAFKA-945:
---------------------------------
We had a similar issue and it was due to a change in consumer initialization. You need to add
cProps.setProperty("auto.offset.reset", OffsetRequest.SmallestTimeString());
when initializing your consumer.
> Problem with test to send a message and then consume it
> -------------------------------------------------------
>
> Key: KAFKA-945
> URL: https://issues.apache.org/jira/browse/KAFKA-945
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.8
> Reporter: Jason Rosenberg
> Attachments: kafak-945.out, kafka-945.tar.gz
>
>
> A simple test, which sends on message synchronously, and then consumes it, is failing, with the latest 0.8 beta release candidate (produced from sha: 23acbd309f5e17de71db46cb6f1a60c8d38ea4e4
> Note this problem did not occur with a previous version of the 0.8 branch (e.g. it seems to work fine for sha: 988d4d8e65a14390abd748318a64e281e4a37c19).
> Essentially, it appears that the message never gets sent (after complaining about missing partition leader, etc.).
> To reproduce, run the sample zookeeper and kafka scripts, that come with the distribution (but first delete all pre-existing state by removing the data directories that zookeeper and kafka use:
> rm -rf /tmp/zookeeper
> rm -rf/tmp/kafka_logs
> ./bin/zookeeper-server-start.sh config/zookeeper.properties
> ./bin/kafka-server-start.sh config/server.properties
> Then execute the simple test code (I will attach a tarball which you should be able to unpack and run this example).
> @Test public void produceAndConsumeMessage() throws Exception {
> // assumes zookeeper and kafka server are running.
> String zkConnect = "localhost:2181";
> int port = 9092;
> Properties pProps = new Properties();
> pProps.put("metadata.broker.list", "localhost:" + port);
> pProps.put("serializer.class", "kafka.serializer.StringEncoder");
> ProducerConfig pConfig = new ProducerConfig(pProps);
> Producer<Integer, String> producer = new Producer<Integer, String>(pConfig);
> KeyedMessage<Integer, String> data =
> new KeyedMessage<Integer, String>("test-topic", "test-message");
> producer.send(data);
> producer.close();
> Properties cProps = new Properties();
> cProps.put("zookeeper.connect", zkConnect);
> cProps.put("group.id", "group1");
> ConsumerConfig consumerConfig = new ConsumerConfig(cProps);
> ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
> Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams =
> consumerConnector.createMessageStreams(ImmutableMap.of("test-topic", 1));
> List<KafkaStream<byte[], byte[]>> streams = topicMessageStreams.get("test-topic");
> final KafkaStream<byte[], byte[]> stream = streams.get(0);
> final ConsumerIterator<byte[], byte[]> iter = stream.iterator();
> // run in a separate thread
> final AtomicBoolean success = new AtomicBoolean(false);
> Thread consumeThread = new Thread(new Runnable() {
> public void run() {
> while (iter.hasNext()) {
> byte[] msg = iter.next().message();
> String msgStr = new String(msg);
> success.set(msgStr.equals("test-message"));
> break;
> }
> }
> });
> consumeThread.start();
> consumeThread.join();
> consumerConnector.shutdown();
> assertTrue(success.get());
> }
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira