You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Nikos Michalakis <ni...@knewton.com> on 2012/06/30 05:13:25 UTC

issue with kafka consumer on 0.7.1 kafka: during a load test all consumer threads are waiting even though messages are in kafka.

Hi everyone,

We 've been running some load tests where we have a single producer, single
broker, 2 consumers.

All of our consumers seem to be stuck on this state, which looks like they
are waiting to take from the queue (thread dump sample below).

The test ends, all messages are sent by the producer and then after we
restart the consumer it then fetches everything.

I would appreciate any leads in debugging this issue.

many thanks,
nikos

2012-06-30T02:00:02.91862 "KafkaConsumerServiceWorker" prio=10
tid=0x00007fb77c0d7000 nid=0x3f80 waiting on condition [0x00007fb7888bb000]
2012-06-30T02:00:02.91863    java.lang.Thread.State: WAITING (parking)
2012-06-30T02:00:02.91863 at sun.misc.Unsafe.park(Native Method)
2012-06-30T02:00:02.91864 - parking to wait for  <0x00000007001d0a88> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
2012-06-30T02:00:02.91866 at
java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
2012-06-30T02:00:02.91866 at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
2012-06-30T02:00:02.91867 at
java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
2012-06-30T02:00:02.91867 at
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
2012-06-30T02:00:02.91868 at
kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
2012-06-30T02:00:02.91869 at
kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
2012-06-30T02:00:02.91869 at
kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
2012-06-30T02:00:02.91870 at
kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:36)
2012-06-30T02:00:02.91870 at
kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:43)
2012-06-30T02:00:02.91871 at
com.knewton.haag.client.kafka.KafkaConsumerServiceWorker.doRun(KafkaConsumerServiceWorker.java:59)
2012-06-30T02:00:02.91872 at
com.knewton.haag.client.kafka.KafkaConsumerServiceWorker.run(KafkaConsumerServiceWorker.java:43)
2012-06-30T02:00:02.91872 at
com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52)
2012-06-30T02:00:02.91874 at java.lang.Thread.run(Thread.java:662)
2012-06-30T02:00:02.91874

Re: issue with kafka consumer on 0.7.1 kafka: during a load test all consumer threads are waiting even though messages are in kafka.

Posted by Jun Rao <ju...@gmail.com>.
Nikos,

You can enable trace level logging in FetcherRunnable in the consumer and
see if any fetch requests are being made.

Thanks,

Jun

On Fri, Jun 29, 2012 at 8:13 PM, Nikos Michalakis <ni...@knewton.com> wrote:

> Hi everyone,
>
> We 've been running some load tests where we have a single producer, single
> broker, 2 consumers.
>
> All of our consumers seem to be stuck on this state, which looks like they
> are waiting to take from the queue (thread dump sample below).
>
> The test ends, all messages are sent by the producer and then after we
> restart the consumer it then fetches everything.
>
> I would appreciate any leads in debugging this issue.
>
> many thanks,
> nikos
>
> 2012-06-30T02:00:02.91862 "KafkaConsumerServiceWorker" prio=10
> tid=0x00007fb77c0d7000 nid=0x3f80 waiting on condition [0x00007fb7888bb000]
> 2012-06-30T02:00:02.91863    java.lang.Thread.State: WAITING (parking)
> 2012-06-30T02:00:02.91863 at sun.misc.Unsafe.park(Native Method)
> 2012-06-30T02:00:02.91864 - parking to wait for  <0x00000007001d0a88> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> 2012-06-30T02:00:02.91866 at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> 2012-06-30T02:00:02.91866 at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> 2012-06-30T02:00:02.91867 at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> 2012-06-30T02:00:02.91867 at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
> 2012-06-30T02:00:02.91868 at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
> 2012-06-30T02:00:02.91869 at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> 2012-06-30T02:00:02.91869 at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> 2012-06-30T02:00:02.91870 at
> kafka.utils.IteratorTemplate.next(IteratorTemplate.scala:36)
> 2012-06-30T02:00:02.91870 at
> kafka.consumer.ConsumerIterator.next(ConsumerIterator.scala:43)
> 2012-06-30T02:00:02.91871 at
>
> com.knewton.haag.client.kafka.KafkaConsumerServiceWorker.doRun(KafkaConsumerServiceWorker.java:59)
> 2012-06-30T02:00:02.91872 at
>
> com.knewton.haag.client.kafka.KafkaConsumerServiceWorker.run(KafkaConsumerServiceWorker.java:43)
> 2012-06-30T02:00:02.91872 at
>
> com.google.common.util.concurrent.AbstractExecutionThreadService$1$1.run(AbstractExecutionThreadService.java:52)
> 2012-06-30T02:00:02.91874 at java.lang.Thread.run(Thread.java:662)
> 2012-06-30T02:00:02.91874
>