You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by zl...@sina.com on 2012/03/02 13:17:10 UTC

can high-level consumer api provide the method getting messages with non-block?

hi:
     I use high-level consumer api to get message. But I want to use non-block method. Below is the code:
 
       ConsumerConnector consumer =  kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig); 
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put("topic", new Integer(1));
    Map<String, List<KafkaMessageStream<Message>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    KafkaMessageStream<Message> stream =  consumerMap.get("topic").get(0);
    ConsumerIterator<Message> it = stream.iterator();
    while(it.hasNext())
    {
     ByteBuffer buffer = it.next().payload();
     byte [] bytes = new byte[buffer.remaining()];
     buffer.get(bytes);
     System.out.println(new String(bytes));
    }

The problem is that when there are no message,the program is blocked at "it.hasNext()";But I want to break out.  Are there some methods?
Thanks!

Re: can high-level consumer api provide the method getting messages with non-block?

Posted by Jun Rao <ju...@gmail.com>.
On the consumer side, you can set consumer.timeout.ms to a non-zero value.
Then the iterator will return when the timeout is reached, if  no messages
comes in time. If you want to use the same iterator again, you likely have
to use trunk since we recently fixed a bug (kafk-241).

Thanks,

Jun

On Fri, Mar 2, 2012 at 4:17 AM, <zl...@sina.com> wrote:

> hi:
>     I use high-level consumer api to get message. But I want to use
> non-block method. Below is the code:
>
>       ConsumerConnector consumer =
>  kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig);
>    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>    topicCountMap.put("topic", new Integer(1));
>    Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>    KafkaMessageStream<Message> stream =  consumerMap.get("topic").get(0);
>    ConsumerIterator<Message> it = stream.iterator();
>    while(it.hasNext())
>    {
>     ByteBuffer buffer = it.next().payload();
>     byte [] bytes = new byte[buffer.remaining()];
>     buffer.get(bytes);
>     System.out.println(new String(bytes));
>    }
>
> The problem is that when there are no message,the program is blocked at
> "it.hasNext()";But I want to break out.  Are there some methods?
> Thanks!

Re: can high-level consumer api provide the method getting messages with non-block?

Posted by Milind Parikh <mi...@gmail.com>.
Using a timeout of >0 on the broker will throw a timeout on consumer.

/***********************
sent from my android...please pardon occasional typos as I respond @ the
speed of thought
************************/

On Mar 2, 2012 7:18 AM, <zl...@sina.com> wrote:

hi:
    I use high-level consumer api to get message. But I want to use
non-block method. Below is the code:

      ConsumerConnector consumer =
 kafka.consumer.Consumer.createJavaConsumerConnector(consumerconfig);
   Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
   topicCountMap.put("topic", new Integer(1));
   Map<String, List<KafkaMessageStream<Message>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
   KafkaMessageStream<Message> stream =  consumerMap.get("topic").get(0);
   ConsumerIterator<Message> it = stream.iterator();
   while(it.hasNext())
   {
    ByteBuffer buffer = it.next().payload();
    byte [] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    System.out.println(new String(bytes));
   }

The problem is that when there are no message,the program is blocked at
"it.hasNext()";But I want to break out.  Are there some methods?
Thanks!