You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shafaq <s....@gmail.com> on 2013/10/29 18:52:29 UTC

0.8 Head: Simple consumer no receiving messages

Hi,
   I see the following scenario:

1. Send messages under some topic X, able to see the log folder in Kafka
Broker with name X-0 (Zeroth partition) and having files xxx.log and
xxx.index under them. So guess this is fine

2. THen I fire up the consumer for topic X, it is able to find two streams
(mapping to two partitions I have defined).


        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumer
                .createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = null;
        Iterator<String> it = topicCountMap.keySet().iterator();
        int threadNumber= 0;
        while(it.hasNext()) {
            String topic = it.next();
            streams = consumerMap.get(topic);
            for (KafkaStream stream : streams) {
                System.out.println("threadNo =" + threadNumber + "  for
topic = " + topic );
                new ConsumerThreadRunnable(stream, threadNumber, topic));
                threadNumber++;
            }
        }

However I don't get any messages in the CounsmerTHreanRunnable here
ConsumerIterator<byte[], byte[]> it = stream.iterator();

        while (it.hasNext() ) {
            byte[] nextMessageByteArray = it.next().message();
       }

If I start the consumer first and then restart the producer thread, sending
the messages for topic X  then consumer is able to receive the messages.

>From kafka docs the high-level  consumer thread does long polling till the
message is available.

What is wrong I'm doing? Any idea to get around the problem.

thanks!

-- 
Kind Regards,
Shafaq

Re: 0.8 Head: Simple consumer no receiving messages

Posted by Guozhang Wang <wa...@gmail.com>.
The default behavior of the consumer is to consume from the tail of the
partition log, did you continue processing data after the consumers have
created their streams?

Guozhang


On Tue, Oct 29, 2013 at 10:52 AM, Shafaq <s....@gmail.com> wrote:

> Hi,
>    I see the following scenario:
>
> 1. Send messages under some topic X, able to see the log folder in Kafka
> Broker with name X-0 (Zeroth partition) and having files xxx.log and
> xxx.index under them. So guess this is fine
>
> 2. THen I fire up the consumer for topic X, it is able to find two streams
> (mapping to two partitions I have defined).
>
>
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumer
>                 .createMessageStreams(topicCountMap);
>         List<KafkaStream<byte[], byte[]>> streams = null;
>         Iterator<String> it = topicCountMap.keySet().iterator();
>         int threadNumber= 0;
>         while(it.hasNext()) {
>             String topic = it.next();
>             streams = consumerMap.get(topic);
>             for (KafkaStream stream : streams) {
>                 System.out.println("threadNo =" + threadNumber + "  for
> topic = " + topic );
>                 new ConsumerThreadRunnable(stream, threadNumber, topic));
>                 threadNumber++;
>             }
>         }
>
> However I don't get any messages in the CounsmerTHreanRunnable here
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
>
>         while (it.hasNext() ) {
>             byte[] nextMessageByteArray = it.next().message();
>        }
>
> If I start the consumer first and then restart the producer thread, sending
> the messages for topic X  then consumer is able to receive the messages.
>
> From kafka docs the high-level  consumer thread does long polling till the
> message is available.
>
> What is wrong I'm doing? Any idea to get around the problem.
>
> thanks!
>
> --
> Kind Regards,
> Shafaq
>



-- 
-- Guozhang