You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by S Ahmed <sa...@gmail.com> on 2014/01/06 02:53:43 UTC

Why does the high level consumer block, or rather where does it?

I'm trying to trace through the codebase and figure out where exactly the
block occurs in the high level consumer?

public void run() {
     ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
     while (it.hasNext())
         System.out.println("Thread " + m_threadNumber + ": " + new
String(it.next().message()));
     System.out.println("Shutting down Thread: " + m_threadNumber);
 }
Reference:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

So from what I understand, the while.it.hasNext() will block if there are
no new messages for this particular topic/partion correct?

Just to understand, can someone clarify where in the kafka source this
block occurs, i.e. the broker that this consumer is connected to will keep
a socket connection open to this consumer and block until a new message
that is owned by this consumer thread arrives and then pushes it to the
consumer to process.

Is it at the iterator level somewhere?
https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/ConsumerIterator.scala

Re: Why does the high level consumer block, or rather where does it?

Posted by Jun Rao <ju...@gmail.com>.
Yes, ConsumerIterator blocks when there is no new message. This is done by
calling take() on a blocking queue.

Thanks,

Jun


On Sun, Jan 5, 2014 at 5:53 PM, S Ahmed <sa...@gmail.com> wrote:

> I'm trying to trace through the codebase and figure out where exactly the
> block occurs in the high level consumer?
>
> public void run() {
>      ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
>      while (it.hasNext())
>          System.out.println("Thread " + m_threadNumber + ": " + new
> String(it.next().message()));
>      System.out.println("Shutting down Thread: " + m_threadNumber);
>  }
> Reference:
> https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
>
> So from what I understand, the while.it.hasNext() will block if there are
> no new messages for this particular topic/partion correct?
>
> Just to understand, can someone clarify where in the kafka source this
> block occurs, i.e. the broker that this consumer is connected to will keep
> a socket connection open to this consumer and block until a new message
> that is owned by this consumer thread arrives and then pushes it to the
> consumer to process.
>
> Is it at the iterator level somewhere?
>
> https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
>