You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Nihit Purwar <np...@sprinklr.com> on 2013/07/09 13:18:36 UTC

Kafka consumer not consuming events

Hi,

We are using kafka-0.7.2 with zookeeper (3.4.5)

Our cluster configuration:
3 brokers on 3 different machines. Each broker machine has a zookeeper instance running as well.
We have 15 topics defined. We are trying to use them as queue (JMS like) by defining the same group across different kafka consumers.
On the consumer side, we are using High Level Consumer.

However we are seeing a weird behaviour.
One of our heavily used queue (event_queue) has 2 dedicated consumers listening to that queue only. 
This queue is defined with 150 partitions on each broker & the number of streams defined on the 2 dedicated consumers is 150.
After a while we see that most the consumer threads keep waiting for events and the lag keeps growing.
If we kill one of the dedicated consumers, then the other consumer starts getting messaging in a hurry.

Consumer had no Full GCs.

How we measure lag?
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group event_queue --zkconnect zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic event_queue

Around the time, the events stopped coming to the new consumer.. this was printed on the logs:

[INFO] zookeeper state changed (Disconnected)
[INFO] zookeeper state changed (Disconnected)
[INFO] zookeeper state changed (SyncConnected)
[INFO] zookeeper state changed (SyncConnected)

Config Overidden:
Consumer:
fetch.size=3MB
autooffset.reset=largest
autocommit.interval.ms=500
Producer:
maxMessageSize=3MB

Please let us know if we are doing some wrong OR facing some known issue here?

Thanks,
Nihit

Re: Kafka consumer not consuming events

Posted by Jun Rao <ju...@gmail.com>.
The weird part is this. If the consumers are consuming, the following
fetcher thread shouldn't be blocked on enqueuing the data. Could you turn
on TRACE level logging in kafka.server.KafkaRequestHandlers and if there is
any fetch requests issued to the broker when the consumer threads get stuck?

"FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on
condition [0x00007fcb833eb000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006809e8000> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at java.util.concurrent.LinkedBlockingQueue.put(
LinkedBlockingQueue.java:306)
        at kafka.consumer.PartitionTopicInfo.enqueue(
PartitionTopicInfo.scala:61)
        at kafka.consumer.FetcherRunnable$$anonfun$run$
5.apply(FetcherRunnable.scala:79)
        at kafka.consumer.FetcherRunnable$$anonfun$run$
5.apply(FetcherRunnable.scala:65)
        at scala.collection.LinearSeqOptimized$class.
foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)

Thanks

Jun


On Wed, Jul 10, 2013 at 8:30 AM, Nihit Purwar <np...@sprinklr.com> wrote:

> Hi Jun,
>
> Thanks for helping out so far.
>
> As per your explanation we are doing exactly as you have mentioned in your
> workaround below.
> > A workaround is to use different consumer connectors, each consuming a
> > single topic.
>
>
> Here is the problem...
>
> We have a topic which gets a lot of events (around a million in a day), so
> this topic on the server has a high number of partitions, and we have
> dedicated consumers only listening to this topic and the processing time is
> in the order of 15-30 millis. So we are assured that our consumers are not
> slow in processing.
>
> Every now then, it so happens, that our consumers threads stalls and do
> not receive any events (as suggested in my previous email with the thread
> stack on idle threads) even though we can see the offset lag increasing for
> the consumers.
>
> We also noticed that if we force rebalance the consumers (either by
> starting a new consumer or killing an existing one) data starts to flow in
> again to these consumer threads. The consumers remains stable (processing
> events) for about 20-30 mins before the threads go idle again and the
> backlog starts growing. This happens in a cycle for us and we are not able
> to figure out the cause for events not flowing in.
>
> As a side note, we are also monitoring the GC cycles and there are hardly
> any.
>
> Please let us know if you need any additional details.
>
> Thanks
> Nihit.
>
>
> On 10-Jul-2013, at 8:30 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Ok. One of the issues is that when you have a consumer that consumes
> > multiple topics, if one of the consumer threads is slow in consuming
> > messages from one topic, it can block the consumption of other consumer
> > threads. This is because we use a shared fetcher to fetch all topics.
> There
> > is an in-memory queue per topic. If one of the queues is full, the
> fetcher
> > will block and can't put the data into other queues.
> >
> > A workaround is to use different consumer connectors, each consuming a
> > single topic.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <np...@sprinklr.com>
> wrote:
> >
> >> Hi Jun,
> >>
> >> Please see my comments inline again :)
> >>
> >> On 10-Jul-2013, at 9:13 AM, Jun Rao <ju...@gmail.com> wrote:
> >>
> >>> This indicates our in-memory queue is empty. So the consumer thread is
> >>> blocked.
> >>
> >> What should we do about this.
> >> As I mentioned in the previous mail, events are there to be consumed.
> >> Killing one consumer makes the other consumer consume events again.
> >>
> >>
> >>> What about the Kafka fetcher threads? Are they blocked on anything?
> >>
> >> One of the fetcher threads is blocked on putting to a queue, the other
> is
> >> sleeping.
> >> Please look below:
> >>
> >> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on
> >> condition [0x00007fcb833eb000]
> >>   java.lang.Thread.State: WAITING (parking)
> >>        at sun.misc.Unsafe.park(Native Method)
> >>        - parking to wait for  <0x00000006809e8000> (a
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>        at
> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>        at
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>        at
> >>
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> >>        at
> >> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
> >>        at
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
> >>        at
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
> >>        at
> >>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> >>        at scala.collection.immutable.List.foreach(List.scala:45)
> >>        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> >>
> >> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on
> >> condition [0x00007fcb836ee000]
> >>   java.lang.Thread.State: TIMED_WAITING (sleeping)
> >>        at java.lang.Thread.sleep(Native Method)
> >>        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)
> >>
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>>
> >>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <np...@sprinklr.com>
> >> wrote:
> >>>
> >>>> Hello Jun,
> >>>>
> >>>> Please see my comments inline.
> >>>>
> >>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <ju...@gmail.com> wrote:
> >>>>
> >>>>> I assume that each consumer instance consumes all 15 topics.
> >>>> No, we kept dedicated consumer listening to the topic in question.
> >>>> We did this because this queue processes huge amounts of data.
> >>>>
> >>>>
> >>>>> Are all your
> >>>>> consumer threads alive? If one of your thread dies, it will
> eventually
> >>>>> block the consumption in other threads.
> >>>>
> >>>> Yes. We can see all the threads in the thread dump.
> >>>> We have ensured that the threads do not die due to an Exception.
> >>>>
> >>>> Please look at the stack trace below. We see all the threads waiting
> >> like
> >>>> this:
> >>>>
> >>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting
> on
> >>>> condition [0x00007efedae6d000]
> >>>>  java.lang.Thread.State: WAITING (parking)
> >>>>       at sun.misc.Unsafe.park(Native Method)
> >>>>       - parking to wait for  <0x0000000640248618> (a
> >>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>>>       at
> >>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>>>       at
> >>>>
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>>>       at
> >>>>
> >>
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> >>>>       at
> >>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
> >>>>       at
> >>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
> >>>>       at
> >>>>
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >>>>       at
> >> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >>>>       at
> >>>>
> >>
> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
> >>>>       at
> >>>>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> >>>>       at
> >>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>>>       at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>>>       at
> >>>>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >>>>       at
> >>>>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >>>>       at java.lang.Thread.run(Thread.java:662)
> >>>>
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Jun
> >>>>>
> >>>>>
> >>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <np...@sprinklr.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
> >>>>>>
> >>>>>> Our cluster configuration:
> >>>>>> 3 brokers on 3 different machines. Each broker machine has a
> zookeeper
> >>>>>> instance running as well.
> >>>>>> We have 15 topics defined. We are trying to use them as queue (JMS
> >> like)
> >>>>>> by defining the same group across different kafka consumers.
> >>>>>> On the consumer side, we are using High Level Consumer.
> >>>>>>
> >>>>>> However we are seeing a weird behaviour.
> >>>>>> One of our heavily used queue (event_queue) has 2 dedicated
> consumers
> >>>>>> listening to that queue only.
> >>>>>> This queue is defined with 150 partitions on each broker & the
> number
> >> of
> >>>>>> streams defined on the 2 dedicated consumers is 150.
> >>>>>> After a while we see that most the consumer threads keep waiting for
> >>>>>> events and the lag keeps growing.
> >>>>>> If we kill one of the dedicated consumers, then the other consumer
> >>>> starts
> >>>>>> getting messaging in a hurry.
> >>>>>>
> >>>>>> Consumer had no Full GCs.
> >>>>>>
> >>>>>> How we measure lag?
> >>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> >>>>>> event_queue --zkconnect
> >>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
> >>>> event_queue
> >>>>>>
> >>>>>> Around the time, the events stopped coming to the new consumer..
> this
> >>>> was
> >>>>>> printed on the logs:
> >>>>>>
> >>>>>> [INFO] zookeeper state changed (Disconnected)
> >>>>>> [INFO] zookeeper state changed (Disconnected)
> >>>>>> [INFO] zookeeper state changed (SyncConnected)
> >>>>>> [INFO] zookeeper state changed (SyncConnected)
> >>>>>>
> >>>>>> Config Overidden:
> >>>>>> Consumer:
> >>>>>> fetch.size=3MB
> >>>>>> autooffset.reset=largest
> >>>>>> autocommit.interval.ms=500
> >>>>>> Producer:
> >>>>>> maxMessageSize=3MB
> >>>>>>
> >>>>>> Please let us know if we are doing some wrong OR facing some known
> >> issue
> >>>>>> here?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Nihit
> >>>>
> >>>>
> >>
> >>
>
>

Re: Kafka consumer not consuming events

Posted by Jun Rao <ju...@gmail.com>.
For those partitions that are lagging, do you see fetch requests in the log?

Thanks,

Jun


On Fri, Jul 19, 2013 at 12:30 AM, Nihit Purwar <np...@sprinklr.com> wrote:

> Hello Jun,
>
> Sorry for the delay in getting the logs.
> Here are the 3 logs from the 3 servers with trace level as suggested:
>
>
> https://docs.google.com/file/d/0B5etsywBa-bkQnBESUJzNV9yRWc/edit?usp=sharing
>
> Please have a look and let us know if you need anything else to further
> debug this problem.
>
> Thanks,
> Nihit
>
> On 11-Jul-2013, at 4:41 PM, Nihit Purwar <np...@sprinklr.com> wrote:
>
> > Hi Jun,
> >
> > I did put in only one topic while starting the consumer and have used
> the same API "createMessageStreams".
> > As for the trace level logs of kafka consumer, we will send that to you
> soon.
> >
> > Thanks again for replying.
> >
> > Nihit
> >
> > On 10-Jul-2013, at 10:38 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> >> Also, just so that we are on the same page. I assume that you used the
> >> following api. Did you just put in one topic in the topicCountMap?
> >> def createMessageStreams(topicCountMap: Map[String,Int]): Map[String,
> >> List[KafkaStream[Array[Byte],Array[Byte]]]]
> >>
> >> Thank,
> >>
> >> Jun
> >>
> >>
> >> On Wed, Jul 10, 2013 at 8:30 AM, Nihit Purwar <np...@sprinklr.com>
> wrote:
> >>
> >>> Hi Jun,
> >>>
> >>> Thanks for helping out so far.
> >>>
> >>> As per your explanation we are doing exactly as you have mentioned in
> your
> >>> workaround below.
> >>>> A workaround is to use different consumer connectors, each consuming a
> >>>> single topic.
> >>>
> >>>
> >>> Here is the problem...
> >>>
> >>> We have a topic which gets a lot of events (around a million in a
> day), so
> >>> this topic on the server has a high number of partitions, and we have
> >>> dedicated consumers only listening to this topic and the processing
> time is
> >>> in the order of 15-30 millis. So we are assured that our consumers are
> not
> >>> slow in processing.
> >>>
> >>> Every now then, it so happens, that our consumers threads stalls and do
> >>> not receive any events (as suggested in my previous email with the
> thread
> >>> stack on idle threads) even though we can see the offset lag
> increasing for
> >>> the consumers.
> >>>
> >>> We also noticed that if we force rebalance the consumers (either by
> >>> starting a new consumer or killing an existing one) data starts to
> flow in
> >>> again to these consumer threads. The consumers remains stable
> (processing
> >>> events) for about 20-30 mins before the threads go idle again and the
> >>> backlog starts growing. This happens in a cycle for us and we are not
> able
> >>> to figure out the cause for events not flowing in.
> >>>
> >>> As a side note, we are also monitoring the GC cycles and there are
> hardly
> >>> any.
> >>>
> >>> Please let us know if you need any additional details.
> >>>
> >>> Thanks
> >>> Nihit.
> >>>
> >>>
> >>> On 10-Jul-2013, at 8:30 PM, Jun Rao <ju...@gmail.com> wrote:
> >>>
> >>>> Ok. One of the issues is that when you have a consumer that consumes
> >>>> multiple topics, if one of the consumer threads is slow in consuming
> >>>> messages from one topic, it can block the consumption of other
> consumer
> >>>> threads. This is because we use a shared fetcher to fetch all topics.
> >>> There
> >>>> is an in-memory queue per topic. If one of the queues is full, the
> >>> fetcher
> >>>> will block and can't put the data into other queues.
> >>>>
> >>>> A workaround is to use different consumer connectors, each consuming a
> >>>> single topic.
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Jun
> >>>>
> >>>>
> >>>> On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <np...@sprinklr.com>
> >>> wrote:
> >>>>
> >>>>> Hi Jun,
> >>>>>
> >>>>> Please see my comments inline again :)
> >>>>>
> >>>>> On 10-Jul-2013, at 9:13 AM, Jun Rao <ju...@gmail.com> wrote:
> >>>>>
> >>>>>> This indicates our in-memory queue is empty. So the consumer thread
> is
> >>>>>> blocked.
> >>>>>
> >>>>> What should we do about this.
> >>>>> As I mentioned in the previous mail, events are there to be consumed.
> >>>>> Killing one consumer makes the other consumer consume events again.
> >>>>>
> >>>>>
> >>>>>> What about the Kafka fetcher threads? Are they blocked on anything?
> >>>>>
> >>>>> One of the fetcher threads is blocked on putting to a queue, the
> other
> >>> is
> >>>>> sleeping.
> >>>>> Please look below:
> >>>>>
> >>>>> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting
> on
> >>>>> condition [0x00007fcb833eb000]
> >>>>> java.lang.Thread.State: WAITING (parking)
> >>>>>      at sun.misc.Unsafe.park(Native Method)
> >>>>>      - parking to wait for  <0x00000006809e8000> (a
> >>>>>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>>>>      at
> >>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>>>>      at
> >>>>>
> >>>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>>>>      at
> >>>>>
> >>>
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> >>>>>      at
> >>>>>
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
> >>>>>      at
> >>>>>
> >>>
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
> >>>>>      at
> >>>>>
> >>>
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
> >>>>>      at
> >>>>>
> >>>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> >>>>>      at scala.collection.immutable.List.foreach(List.scala:45)
> >>>>>      at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> >>>>>
> >>>>> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting
> on
> >>>>> condition [0x00007fcb836ee000]
> >>>>> java.lang.Thread.State: TIMED_WAITING (sleeping)
> >>>>>      at java.lang.Thread.sleep(Native Method)
> >>>>>      at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)
> >>>>>
> >>>>>>
> >>>>>> Thanks,
> >>>>>>
> >>>>>> Jun
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <np...@sprinklr.com>
> >>>>> wrote:
> >>>>>>
> >>>>>>> Hello Jun,
> >>>>>>>
> >>>>>>> Please see my comments inline.
> >>>>>>>
> >>>>>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <ju...@gmail.com> wrote:
> >>>>>>>
> >>>>>>>> I assume that each consumer instance consumes all 15 topics.
> >>>>>>> No, we kept dedicated consumer listening to the topic in question.
> >>>>>>> We did this because this queue processes huge amounts of data.
> >>>>>>>
> >>>>>>>
> >>>>>>>> Are all your
> >>>>>>>> consumer threads alive? If one of your thread dies, it will
> >>> eventually
> >>>>>>>> block the consumption in other threads.
> >>>>>>>
> >>>>>>> Yes. We can see all the threads in the thread dump.
> >>>>>>> We have ensured that the threads do not die due to an Exception.
> >>>>>>>
> >>>>>>> Please look at the stack trace below. We see all the threads
> waiting
> >>>>> like
> >>>>>>> this:
> >>>>>>>
> >>>>>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9
> waiting
> >>> on
> >>>>>>> condition [0x00007efedae6d000]
> >>>>>>> java.lang.Thread.State: WAITING (parking)
> >>>>>>>     at sun.misc.Unsafe.park(Native Method)
> >>>>>>>     - parking to wait for  <0x0000000640248618> (a
> >>>>>>>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>>>>>>     at
> >>>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>>>>>>     at
> >>>>>>>
> >>>>>
> >>>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>>>>>>     at
> >>>>>>>
> >>>>>
> >>>
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> >>>>>>>     at
> >>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
> >>>>>>>     at
> >>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
> >>>>>>>     at
> >>>>>>>
> >>>
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >>>>>>>     at
> >>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >>>>>>>     at
> >>>>>>>
> >>>>>
> >>>
> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
> >>>>>>>     at
> >>>>>>>
> >>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> >>>>>>>     at
> >>>>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>>>>>>     at
> >>>>>>>
> >>>>>
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >>>>>>>     at
> >>>>>>>
> >>>>>
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >>>>>>>     at java.lang.Thread.run(Thread.java:662)
> >>>>>>>
> >>>>>>>>
> >>>>>>>> Thanks,
> >>>>>>>>
> >>>>>>>> Jun
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <
> npurwar@sprinklr.com>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
> >>>>>>>>>
> >>>>>>>>> Our cluster configuration:
> >>>>>>>>> 3 brokers on 3 different machines. Each broker machine has a
> >>> zookeeper
> >>>>>>>>> instance running as well.
> >>>>>>>>> We have 15 topics defined. We are trying to use them as queue
> (JMS
> >>>>> like)
> >>>>>>>>> by defining the same group across different kafka consumers.
> >>>>>>>>> On the consumer side, we are using High Level Consumer.
> >>>>>>>>>
> >>>>>>>>> However we are seeing a weird behaviour.
> >>>>>>>>> One of our heavily used queue (event_queue) has 2 dedicated
> >>> consumers
> >>>>>>>>> listening to that queue only.
> >>>>>>>>> This queue is defined with 150 partitions on each broker & the
> >>> number
> >>>>> of
> >>>>>>>>> streams defined on the 2 dedicated consumers is 150.
> >>>>>>>>> After a while we see that most the consumer threads keep waiting
> for
> >>>>>>>>> events and the lag keeps growing.
> >>>>>>>>> If we kill one of the dedicated consumers, then the other
> consumer
> >>>>>>> starts
> >>>>>>>>> getting messaging in a hurry.
> >>>>>>>>>
> >>>>>>>>> Consumer had no Full GCs.
> >>>>>>>>>
> >>>>>>>>> How we measure lag?
> >>>>>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> >>>>>>>>> event_queue --zkconnect
> >>>>>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
> >>>>>>> event_queue
> >>>>>>>>>
> >>>>>>>>> Around the time, the events stopped coming to the new consumer..
> >>> this
> >>>>>>> was
> >>>>>>>>> printed on the logs:
> >>>>>>>>>
> >>>>>>>>> [INFO] zookeeper state changed (Disconnected)
> >>>>>>>>> [INFO] zookeeper state changed (Disconnected)
> >>>>>>>>> [INFO] zookeeper state changed (SyncConnected)
> >>>>>>>>> [INFO] zookeeper state changed (SyncConnected)
> >>>>>>>>>
> >>>>>>>>> Config Overidden:
> >>>>>>>>> Consumer:
> >>>>>>>>> fetch.size=3MB
> >>>>>>>>> autooffset.reset=largest
> >>>>>>>>> autocommit.interval.ms=500
> >>>>>>>>> Producer:
> >>>>>>>>> maxMessageSize=3MB
> >>>>>>>>>
> >>>>>>>>> Please let us know if we are doing some wrong OR facing some
> known
> >>>>> issue
> >>>>>>>>> here?
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Nihit
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>
> >>>
> >
>
>

Re: Kafka consumer not consuming events

Posted by Nihit Purwar <np...@sprinklr.com>.
Hello Jun,

Sorry for the delay in getting the logs.
Here are the 3 logs from the 3 servers with trace level as suggested:

https://docs.google.com/file/d/0B5etsywBa-bkQnBESUJzNV9yRWc/edit?usp=sharing

Please have a look and let us know if you need anything else to further debug this problem.

Thanks,
Nihit

On 11-Jul-2013, at 4:41 PM, Nihit Purwar <np...@sprinklr.com> wrote:

> Hi Jun,
> 
> I did put in only one topic while starting the consumer and have used the same API "createMessageStreams".
> As for the trace level logs of kafka consumer, we will send that to you soon.
> 
> Thanks again for replying.
> 
> Nihit
> 
> On 10-Jul-2013, at 10:38 PM, Jun Rao <ju...@gmail.com> wrote:
> 
>> Also, just so that we are on the same page. I assume that you used the
>> following api. Did you just put in one topic in the topicCountMap?
>> def createMessageStreams(topicCountMap: Map[String,Int]): Map[String,
>> List[KafkaStream[Array[Byte],Array[Byte]]]]
>> 
>> Thank,
>> 
>> Jun
>> 
>> 
>> On Wed, Jul 10, 2013 at 8:30 AM, Nihit Purwar <np...@sprinklr.com> wrote:
>> 
>>> Hi Jun,
>>> 
>>> Thanks for helping out so far.
>>> 
>>> As per your explanation we are doing exactly as you have mentioned in your
>>> workaround below.
>>>> A workaround is to use different consumer connectors, each consuming a
>>>> single topic.
>>> 
>>> 
>>> Here is the problem...
>>> 
>>> We have a topic which gets a lot of events (around a million in a day), so
>>> this topic on the server has a high number of partitions, and we have
>>> dedicated consumers only listening to this topic and the processing time is
>>> in the order of 15-30 millis. So we are assured that our consumers are not
>>> slow in processing.
>>> 
>>> Every now then, it so happens, that our consumers threads stalls and do
>>> not receive any events (as suggested in my previous email with the thread
>>> stack on idle threads) even though we can see the offset lag increasing for
>>> the consumers.
>>> 
>>> We also noticed that if we force rebalance the consumers (either by
>>> starting a new consumer or killing an existing one) data starts to flow in
>>> again to these consumer threads. The consumers remains stable (processing
>>> events) for about 20-30 mins before the threads go idle again and the
>>> backlog starts growing. This happens in a cycle for us and we are not able
>>> to figure out the cause for events not flowing in.
>>> 
>>> As a side note, we are also monitoring the GC cycles and there are hardly
>>> any.
>>> 
>>> Please let us know if you need any additional details.
>>> 
>>> Thanks
>>> Nihit.
>>> 
>>> 
>>> On 10-Jul-2013, at 8:30 PM, Jun Rao <ju...@gmail.com> wrote:
>>> 
>>>> Ok. One of the issues is that when you have a consumer that consumes
>>>> multiple topics, if one of the consumer threads is slow in consuming
>>>> messages from one topic, it can block the consumption of other consumer
>>>> threads. This is because we use a shared fetcher to fetch all topics.
>>> There
>>>> is an in-memory queue per topic. If one of the queues is full, the
>>> fetcher
>>>> will block and can't put the data into other queues.
>>>> 
>>>> A workaround is to use different consumer connectors, each consuming a
>>>> single topic.
>>>> 
>>>> Thanks,
>>>> 
>>>> Jun
>>>> 
>>>> 
>>>> On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <np...@sprinklr.com>
>>> wrote:
>>>> 
>>>>> Hi Jun,
>>>>> 
>>>>> Please see my comments inline again :)
>>>>> 
>>>>> On 10-Jul-2013, at 9:13 AM, Jun Rao <ju...@gmail.com> wrote:
>>>>> 
>>>>>> This indicates our in-memory queue is empty. So the consumer thread is
>>>>>> blocked.
>>>>> 
>>>>> What should we do about this.
>>>>> As I mentioned in the previous mail, events are there to be consumed.
>>>>> Killing one consumer makes the other consumer consume events again.
>>>>> 
>>>>> 
>>>>>> What about the Kafka fetcher threads? Are they blocked on anything?
>>>>> 
>>>>> One of the fetcher threads is blocked on putting to a queue, the other
>>> is
>>>>> sleeping.
>>>>> Please look below:
>>>>> 
>>>>> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on
>>>>> condition [0x00007fcb833eb000]
>>>>> java.lang.Thread.State: WAITING (parking)
>>>>>      at sun.misc.Unsafe.park(Native Method)
>>>>>      - parking to wait for  <0x00000006809e8000> (a
>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>>      at
>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>>>>      at
>>>>> 
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>>>      at
>>>>> 
>>> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>>>>>      at
>>>>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
>>>>>      at
>>>>> 
>>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
>>>>>      at
>>>>> 
>>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
>>>>>      at
>>>>> 
>>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>>>>>      at scala.collection.immutable.List.foreach(List.scala:45)
>>>>>      at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>>>>> 
>>>>> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on
>>>>> condition [0x00007fcb836ee000]
>>>>> java.lang.Thread.State: TIMED_WAITING (sleeping)
>>>>>      at java.lang.Thread.sleep(Native Method)
>>>>>      at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)
>>>>> 
>>>>>> 
>>>>>> Thanks,
>>>>>> 
>>>>>> Jun
>>>>>> 
>>>>>> 
>>>>>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <np...@sprinklr.com>
>>>>> wrote:
>>>>>> 
>>>>>>> Hello Jun,
>>>>>>> 
>>>>>>> Please see my comments inline.
>>>>>>> 
>>>>>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <ju...@gmail.com> wrote:
>>>>>>> 
>>>>>>>> I assume that each consumer instance consumes all 15 topics.
>>>>>>> No, we kept dedicated consumer listening to the topic in question.
>>>>>>> We did this because this queue processes huge amounts of data.
>>>>>>> 
>>>>>>> 
>>>>>>>> Are all your
>>>>>>>> consumer threads alive? If one of your thread dies, it will
>>> eventually
>>>>>>>> block the consumption in other threads.
>>>>>>> 
>>>>>>> Yes. We can see all the threads in the thread dump.
>>>>>>> We have ensured that the threads do not die due to an Exception.
>>>>>>> 
>>>>>>> Please look at the stack trace below. We see all the threads waiting
>>>>> like
>>>>>>> this:
>>>>>>> 
>>>>>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting
>>> on
>>>>>>> condition [0x00007efedae6d000]
>>>>>>> java.lang.Thread.State: WAITING (parking)
>>>>>>>     at sun.misc.Unsafe.park(Native Method)
>>>>>>>     - parking to wait for  <0x0000000640248618> (a
>>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>>>>     at
>>>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>>>>>>     at
>>>>>>> 
>>>>> 
>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>>>>>     at
>>>>>>> 
>>>>> 
>>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>>>>>     at
>>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
>>>>>>>     at
>>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
>>>>>>>     at
>>>>>>> 
>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>>>>>>     at
>>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>>>>>>     at
>>>>>>> 
>>>>> 
>>> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
>>>>>>>     at
>>>>>>> 
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>>>>>>     at
>>>>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>>>>>     at
>>>>>>> 
>>>>> 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>>>>     at
>>>>>>> 
>>>>> 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>>>>     at java.lang.Thread.run(Thread.java:662)
>>>>>>> 
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> 
>>>>>>>> Jun
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <np...@sprinklr.com>
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
>>>>>>>>> 
>>>>>>>>> Our cluster configuration:
>>>>>>>>> 3 brokers on 3 different machines. Each broker machine has a
>>> zookeeper
>>>>>>>>> instance running as well.
>>>>>>>>> We have 15 topics defined. We are trying to use them as queue (JMS
>>>>> like)
>>>>>>>>> by defining the same group across different kafka consumers.
>>>>>>>>> On the consumer side, we are using High Level Consumer.
>>>>>>>>> 
>>>>>>>>> However we are seeing a weird behaviour.
>>>>>>>>> One of our heavily used queue (event_queue) has 2 dedicated
>>> consumers
>>>>>>>>> listening to that queue only.
>>>>>>>>> This queue is defined with 150 partitions on each broker & the
>>> number
>>>>> of
>>>>>>>>> streams defined on the 2 dedicated consumers is 150.
>>>>>>>>> After a while we see that most the consumer threads keep waiting for
>>>>>>>>> events and the lag keeps growing.
>>>>>>>>> If we kill one of the dedicated consumers, then the other consumer
>>>>>>> starts
>>>>>>>>> getting messaging in a hurry.
>>>>>>>>> 
>>>>>>>>> Consumer had no Full GCs.
>>>>>>>>> 
>>>>>>>>> How we measure lag?
>>>>>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
>>>>>>>>> event_queue --zkconnect
>>>>>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
>>>>>>> event_queue
>>>>>>>>> 
>>>>>>>>> Around the time, the events stopped coming to the new consumer..
>>> this
>>>>>>> was
>>>>>>>>> printed on the logs:
>>>>>>>>> 
>>>>>>>>> [INFO] zookeeper state changed (Disconnected)
>>>>>>>>> [INFO] zookeeper state changed (Disconnected)
>>>>>>>>> [INFO] zookeeper state changed (SyncConnected)
>>>>>>>>> [INFO] zookeeper state changed (SyncConnected)
>>>>>>>>> 
>>>>>>>>> Config Overidden:
>>>>>>>>> Consumer:
>>>>>>>>> fetch.size=3MB
>>>>>>>>> autooffset.reset=largest
>>>>>>>>> autocommit.interval.ms=500
>>>>>>>>> Producer:
>>>>>>>>> maxMessageSize=3MB
>>>>>>>>> 
>>>>>>>>> Please let us know if we are doing some wrong OR facing some known
>>>>> issue
>>>>>>>>> here?
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Nihit
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
> 


Re: Kafka consumer not consuming events

Posted by Nihit Purwar <np...@sprinklr.com>.
Hi Jun,

I did put in only one topic while starting the consumer and have used the same API "createMessageStreams".
As for the trace level logs of kafka consumer, we will send that to you soon.

Thanks again for replying.

Nihit

On 10-Jul-2013, at 10:38 PM, Jun Rao <ju...@gmail.com> wrote:

> Also, just so that we are on the same page. I assume that you used the
> following api. Did you just put in one topic in the topicCountMap?
>  def createMessageStreams(topicCountMap: Map[String,Int]): Map[String,
> List[KafkaStream[Array[Byte],Array[Byte]]]]
> 
> Thank,
> 
> Jun
> 
> 
> On Wed, Jul 10, 2013 at 8:30 AM, Nihit Purwar <np...@sprinklr.com> wrote:
> 
>> Hi Jun,
>> 
>> Thanks for helping out so far.
>> 
>> As per your explanation we are doing exactly as you have mentioned in your
>> workaround below.
>>> A workaround is to use different consumer connectors, each consuming a
>>> single topic.
>> 
>> 
>> Here is the problem...
>> 
>> We have a topic which gets a lot of events (around a million in a day), so
>> this topic on the server has a high number of partitions, and we have
>> dedicated consumers only listening to this topic and the processing time is
>> in the order of 15-30 millis. So we are assured that our consumers are not
>> slow in processing.
>> 
>> Every now then, it so happens, that our consumers threads stalls and do
>> not receive any events (as suggested in my previous email with the thread
>> stack on idle threads) even though we can see the offset lag increasing for
>> the consumers.
>> 
>> We also noticed that if we force rebalance the consumers (either by
>> starting a new consumer or killing an existing one) data starts to flow in
>> again to these consumer threads. The consumers remains stable (processing
>> events) for about 20-30 mins before the threads go idle again and the
>> backlog starts growing. This happens in a cycle for us and we are not able
>> to figure out the cause for events not flowing in.
>> 
>> As a side note, we are also monitoring the GC cycles and there are hardly
>> any.
>> 
>> Please let us know if you need any additional details.
>> 
>> Thanks
>> Nihit.
>> 
>> 
>> On 10-Jul-2013, at 8:30 PM, Jun Rao <ju...@gmail.com> wrote:
>> 
>>> Ok. One of the issues is that when you have a consumer that consumes
>>> multiple topics, if one of the consumer threads is slow in consuming
>>> messages from one topic, it can block the consumption of other consumer
>>> threads. This is because we use a shared fetcher to fetch all topics.
>> There
>>> is an in-memory queue per topic. If one of the queues is full, the
>> fetcher
>>> will block and can't put the data into other queues.
>>> 
>>> A workaround is to use different consumer connectors, each consuming a
>>> single topic.
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> 
>>> On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <np...@sprinklr.com>
>> wrote:
>>> 
>>>> Hi Jun,
>>>> 
>>>> Please see my comments inline again :)
>>>> 
>>>> On 10-Jul-2013, at 9:13 AM, Jun Rao <ju...@gmail.com> wrote:
>>>> 
>>>>> This indicates our in-memory queue is empty. So the consumer thread is
>>>>> blocked.
>>>> 
>>>> What should we do about this.
>>>> As I mentioned in the previous mail, events are there to be consumed.
>>>> Killing one consumer makes the other consumer consume events again.
>>>> 
>>>> 
>>>>> What about the Kafka fetcher threads? Are they blocked on anything?
>>>> 
>>>> One of the fetcher threads is blocked on putting to a queue, the other
>> is
>>>> sleeping.
>>>> Please look below:
>>>> 
>>>> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on
>>>> condition [0x00007fcb833eb000]
>>>>  java.lang.Thread.State: WAITING (parking)
>>>>       at sun.misc.Unsafe.park(Native Method)
>>>>       - parking to wait for  <0x00000006809e8000> (a
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>       at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>>>       at
>>>> 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>>       at
>>>> 
>> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>>>>       at
>>>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
>>>>       at
>>>> 
>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
>>>>       at
>>>> 
>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
>>>>       at
>>>> 
>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>>>>       at scala.collection.immutable.List.foreach(List.scala:45)
>>>>       at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>>>> 
>>>> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on
>>>> condition [0x00007fcb836ee000]
>>>>  java.lang.Thread.State: TIMED_WAITING (sleeping)
>>>>       at java.lang.Thread.sleep(Native Method)
>>>>       at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)
>>>> 
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jun
>>>>> 
>>>>> 
>>>>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <np...@sprinklr.com>
>>>> wrote:
>>>>> 
>>>>>> Hello Jun,
>>>>>> 
>>>>>> Please see my comments inline.
>>>>>> 
>>>>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <ju...@gmail.com> wrote:
>>>>>> 
>>>>>>> I assume that each consumer instance consumes all 15 topics.
>>>>>> No, we kept dedicated consumer listening to the topic in question.
>>>>>> We did this because this queue processes huge amounts of data.
>>>>>> 
>>>>>> 
>>>>>>> Are all your
>>>>>>> consumer threads alive? If one of your thread dies, it will
>> eventually
>>>>>>> block the consumption in other threads.
>>>>>> 
>>>>>> Yes. We can see all the threads in the thread dump.
>>>>>> We have ensured that the threads do not die due to an Exception.
>>>>>> 
>>>>>> Please look at the stack trace below. We see all the threads waiting
>>>> like
>>>>>> this:
>>>>>> 
>>>>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting
>> on
>>>>>> condition [0x00007efedae6d000]
>>>>>> java.lang.Thread.State: WAITING (parking)
>>>>>>      at sun.misc.Unsafe.park(Native Method)
>>>>>>      - parking to wait for  <0x0000000640248618> (a
>>>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>>>      at
>>>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>>>>>      at
>>>>>> 
>>>> 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>>>>      at
>>>>>> 
>>>> 
>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>>>>      at
>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
>>>>>>      at
>>>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
>>>>>>      at
>>>>>> 
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>>>>>      at
>>>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>>>>>      at
>>>>>> 
>>>> 
>> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
>>>>>>      at
>>>>>> 
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>>>>>      at
>>>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>>>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>>>>      at
>>>>>> 
>>>> 
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>>>      at
>>>>>> 
>>>> 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>>>      at java.lang.Thread.run(Thread.java:662)
>>>>>> 
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Jun
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <np...@sprinklr.com>
>>>>>> wrote:
>>>>>>> 
>>>>>>>> Hi,
>>>>>>>> 
>>>>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
>>>>>>>> 
>>>>>>>> Our cluster configuration:
>>>>>>>> 3 brokers on 3 different machines. Each broker machine has a
>> zookeeper
>>>>>>>> instance running as well.
>>>>>>>> We have 15 topics defined. We are trying to use them as queue (JMS
>>>> like)
>>>>>>>> by defining the same group across different kafka consumers.
>>>>>>>> On the consumer side, we are using High Level Consumer.
>>>>>>>> 
>>>>>>>> However we are seeing a weird behaviour.
>>>>>>>> One of our heavily used queue (event_queue) has 2 dedicated
>> consumers
>>>>>>>> listening to that queue only.
>>>>>>>> This queue is defined with 150 partitions on each broker & the
>> number
>>>> of
>>>>>>>> streams defined on the 2 dedicated consumers is 150.
>>>>>>>> After a while we see that most the consumer threads keep waiting for
>>>>>>>> events and the lag keeps growing.
>>>>>>>> If we kill one of the dedicated consumers, then the other consumer
>>>>>> starts
>>>>>>>> getting messaging in a hurry.
>>>>>>>> 
>>>>>>>> Consumer had no Full GCs.
>>>>>>>> 
>>>>>>>> How we measure lag?
>>>>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
>>>>>>>> event_queue --zkconnect
>>>>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
>>>>>> event_queue
>>>>>>>> 
>>>>>>>> Around the time, the events stopped coming to the new consumer..
>> this
>>>>>> was
>>>>>>>> printed on the logs:
>>>>>>>> 
>>>>>>>> [INFO] zookeeper state changed (Disconnected)
>>>>>>>> [INFO] zookeeper state changed (Disconnected)
>>>>>>>> [INFO] zookeeper state changed (SyncConnected)
>>>>>>>> [INFO] zookeeper state changed (SyncConnected)
>>>>>>>> 
>>>>>>>> Config Overidden:
>>>>>>>> Consumer:
>>>>>>>> fetch.size=3MB
>>>>>>>> autooffset.reset=largest
>>>>>>>> autocommit.interval.ms=500
>>>>>>>> Producer:
>>>>>>>> maxMessageSize=3MB
>>>>>>>> 
>>>>>>>> Please let us know if we are doing some wrong OR facing some known
>>>> issue
>>>>>>>> here?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Nihit
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 


Re: Kafka consumer not consuming events

Posted by Jun Rao <ju...@gmail.com>.
Also, just so that we are on the same page. I assume that you used the
following api. Did you just put in one topic in the topicCountMap?
  def createMessageStreams(topicCountMap: Map[String,Int]): Map[String,
List[KafkaStream[Array[Byte],Array[Byte]]]]

Thank,

Jun


On Wed, Jul 10, 2013 at 8:30 AM, Nihit Purwar <np...@sprinklr.com> wrote:

> Hi Jun,
>
> Thanks for helping out so far.
>
> As per your explanation we are doing exactly as you have mentioned in your
> workaround below.
> > A workaround is to use different consumer connectors, each consuming a
> > single topic.
>
>
> Here is the problem...
>
> We have a topic which gets a lot of events (around a million in a day), so
> this topic on the server has a high number of partitions, and we have
> dedicated consumers only listening to this topic and the processing time is
> in the order of 15-30 millis. So we are assured that our consumers are not
> slow in processing.
>
> Every now then, it so happens, that our consumers threads stalls and do
> not receive any events (as suggested in my previous email with the thread
> stack on idle threads) even though we can see the offset lag increasing for
> the consumers.
>
> We also noticed that if we force rebalance the consumers (either by
> starting a new consumer or killing an existing one) data starts to flow in
> again to these consumer threads. The consumers remains stable (processing
> events) for about 20-30 mins before the threads go idle again and the
> backlog starts growing. This happens in a cycle for us and we are not able
> to figure out the cause for events not flowing in.
>
> As a side note, we are also monitoring the GC cycles and there are hardly
> any.
>
> Please let us know if you need any additional details.
>
> Thanks
> Nihit.
>
>
> On 10-Jul-2013, at 8:30 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Ok. One of the issues is that when you have a consumer that consumes
> > multiple topics, if one of the consumer threads is slow in consuming
> > messages from one topic, it can block the consumption of other consumer
> > threads. This is because we use a shared fetcher to fetch all topics.
> There
> > is an in-memory queue per topic. If one of the queues is full, the
> fetcher
> > will block and can't put the data into other queues.
> >
> > A workaround is to use different consumer connectors, each consuming a
> > single topic.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <np...@sprinklr.com>
> wrote:
> >
> >> Hi Jun,
> >>
> >> Please see my comments inline again :)
> >>
> >> On 10-Jul-2013, at 9:13 AM, Jun Rao <ju...@gmail.com> wrote:
> >>
> >>> This indicates our in-memory queue is empty. So the consumer thread is
> >>> blocked.
> >>
> >> What should we do about this.
> >> As I mentioned in the previous mail, events are there to be consumed.
> >> Killing one consumer makes the other consumer consume events again.
> >>
> >>
> >>> What about the Kafka fetcher threads? Are they blocked on anything?
> >>
> >> One of the fetcher threads is blocked on putting to a queue, the other
> is
> >> sleeping.
> >> Please look below:
> >>
> >> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on
> >> condition [0x00007fcb833eb000]
> >>   java.lang.Thread.State: WAITING (parking)
> >>        at sun.misc.Unsafe.park(Native Method)
> >>        - parking to wait for  <0x00000006809e8000> (a
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>        at
> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>        at
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>        at
> >>
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
> >>        at
> >> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
> >>        at
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
> >>        at
> >>
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
> >>        at
> >>
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
> >>        at scala.collection.immutable.List.foreach(List.scala:45)
> >>        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
> >>
> >> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on
> >> condition [0x00007fcb836ee000]
> >>   java.lang.Thread.State: TIMED_WAITING (sleeping)
> >>        at java.lang.Thread.sleep(Native Method)
> >>        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)
> >>
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>>
> >>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <np...@sprinklr.com>
> >> wrote:
> >>>
> >>>> Hello Jun,
> >>>>
> >>>> Please see my comments inline.
> >>>>
> >>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <ju...@gmail.com> wrote:
> >>>>
> >>>>> I assume that each consumer instance consumes all 15 topics.
> >>>> No, we kept dedicated consumer listening to the topic in question.
> >>>> We did this because this queue processes huge amounts of data.
> >>>>
> >>>>
> >>>>> Are all your
> >>>>> consumer threads alive? If one of your thread dies, it will
> eventually
> >>>>> block the consumption in other threads.
> >>>>
> >>>> Yes. We can see all the threads in the thread dump.
> >>>> We have ensured that the threads do not die due to an Exception.
> >>>>
> >>>> Please look at the stack trace below. We see all the threads waiting
> >> like
> >>>> this:
> >>>>
> >>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting
> on
> >>>> condition [0x00007efedae6d000]
> >>>>  java.lang.Thread.State: WAITING (parking)
> >>>>       at sun.misc.Unsafe.park(Native Method)
> >>>>       - parking to wait for  <0x0000000640248618> (a
> >>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>>>       at
> >>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>>>       at
> >>>>
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>>>       at
> >>>>
> >>
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> >>>>       at
> >>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
> >>>>       at
> >>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
> >>>>       at
> >>>>
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >>>>       at
> >> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >>>>       at
> >>>>
> >>
> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
> >>>>       at
> >>>>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> >>>>       at
> >>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>>>       at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>>>       at
> >>>>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >>>>       at
> >>>>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >>>>       at java.lang.Thread.run(Thread.java:662)
> >>>>
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Jun
> >>>>>
> >>>>>
> >>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <np...@sprinklr.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
> >>>>>>
> >>>>>> Our cluster configuration:
> >>>>>> 3 brokers on 3 different machines. Each broker machine has a
> zookeeper
> >>>>>> instance running as well.
> >>>>>> We have 15 topics defined. We are trying to use them as queue (JMS
> >> like)
> >>>>>> by defining the same group across different kafka consumers.
> >>>>>> On the consumer side, we are using High Level Consumer.
> >>>>>>
> >>>>>> However we are seeing a weird behaviour.
> >>>>>> One of our heavily used queue (event_queue) has 2 dedicated
> consumers
> >>>>>> listening to that queue only.
> >>>>>> This queue is defined with 150 partitions on each broker & the
> number
> >> of
> >>>>>> streams defined on the 2 dedicated consumers is 150.
> >>>>>> After a while we see that most the consumer threads keep waiting for
> >>>>>> events and the lag keeps growing.
> >>>>>> If we kill one of the dedicated consumers, then the other consumer
> >>>> starts
> >>>>>> getting messaging in a hurry.
> >>>>>>
> >>>>>> Consumer had no Full GCs.
> >>>>>>
> >>>>>> How we measure lag?
> >>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> >>>>>> event_queue --zkconnect
> >>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
> >>>> event_queue
> >>>>>>
> >>>>>> Around the time, the events stopped coming to the new consumer..
> this
> >>>> was
> >>>>>> printed on the logs:
> >>>>>>
> >>>>>> [INFO] zookeeper state changed (Disconnected)
> >>>>>> [INFO] zookeeper state changed (Disconnected)
> >>>>>> [INFO] zookeeper state changed (SyncConnected)
> >>>>>> [INFO] zookeeper state changed (SyncConnected)
> >>>>>>
> >>>>>> Config Overidden:
> >>>>>> Consumer:
> >>>>>> fetch.size=3MB
> >>>>>> autooffset.reset=largest
> >>>>>> autocommit.interval.ms=500
> >>>>>> Producer:
> >>>>>> maxMessageSize=3MB
> >>>>>>
> >>>>>> Please let us know if we are doing some wrong OR facing some known
> >> issue
> >>>>>> here?
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Nihit
> >>>>
> >>>>
> >>
> >>
>
>

Re: Kafka consumer not consuming events

Posted by Nihit Purwar <np...@sprinklr.com>.
Hi Jun,

Thanks for helping out so far. 

As per your explanation we are doing exactly as you have mentioned in your workaround below.
> A workaround is to use different consumer connectors, each consuming a
> single topic.


Here is the problem...

We have a topic which gets a lot of events (around a million in a day), so this topic on the server has a high number of partitions, and we have dedicated consumers only listening to this topic and the processing time is in the order of 15-30 millis. So we are assured that our consumers are not slow in processing. 

Every now then, it so happens, that our consumers threads stalls and do not receive any events (as suggested in my previous email with the thread stack on idle threads) even though we can see the offset lag increasing for the consumers.

We also noticed that if we force rebalance the consumers (either by starting a new consumer or killing an existing one) data starts to flow in again to these consumer threads. The consumers remains stable (processing events) for about 20-30 mins before the threads go idle again and the backlog starts growing. This happens in a cycle for us and we are not able to figure out the cause for events not flowing in.

As a side note, we are also monitoring the GC cycles and there are hardly any.

Please let us know if you need any additional details. 

Thanks
Nihit.


On 10-Jul-2013, at 8:30 PM, Jun Rao <ju...@gmail.com> wrote:

> Ok. One of the issues is that when you have a consumer that consumes
> multiple topics, if one of the consumer threads is slow in consuming
> messages from one topic, it can block the consumption of other consumer
> threads. This is because we use a shared fetcher to fetch all topics. There
> is an in-memory queue per topic. If one of the queues is full, the fetcher
> will block and can't put the data into other queues.
> 
> A workaround is to use different consumer connectors, each consuming a
> single topic.
> 
> Thanks,
> 
> Jun
> 
> 
> On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <np...@sprinklr.com> wrote:
> 
>> Hi Jun,
>> 
>> Please see my comments inline again :)
>> 
>> On 10-Jul-2013, at 9:13 AM, Jun Rao <ju...@gmail.com> wrote:
>> 
>>> This indicates our in-memory queue is empty. So the consumer thread is
>>> blocked.
>> 
>> What should we do about this.
>> As I mentioned in the previous mail, events are there to be consumed.
>> Killing one consumer makes the other consumer consume events again.
>> 
>> 
>>> What about the Kafka fetcher threads? Are they blocked on anything?
>> 
>> One of the fetcher threads is blocked on putting to a queue, the other is
>> sleeping.
>> Please look below:
>> 
>> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on
>> condition [0x00007fcb833eb000]
>>   java.lang.Thread.State: WAITING (parking)
>>        at sun.misc.Unsafe.park(Native Method)
>>        - parking to wait for  <0x00000006809e8000> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>        at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>        at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>        at
>> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>>        at
>> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
>>        at
>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
>>        at
>> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
>>        at
>> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>>        at scala.collection.immutable.List.foreach(List.scala:45)
>>        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>> 
>> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on
>> condition [0x00007fcb836ee000]
>>   java.lang.Thread.State: TIMED_WAITING (sleeping)
>>        at java.lang.Thread.sleep(Native Method)
>>        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)
>> 
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> 
>>> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <np...@sprinklr.com>
>> wrote:
>>> 
>>>> Hello Jun,
>>>> 
>>>> Please see my comments inline.
>>>> 
>>>> On 09-Jul-2013, at 8:32 PM, Jun Rao <ju...@gmail.com> wrote:
>>>> 
>>>>> I assume that each consumer instance consumes all 15 topics.
>>>> No, we kept dedicated consumer listening to the topic in question.
>>>> We did this because this queue processes huge amounts of data.
>>>> 
>>>> 
>>>>> Are all your
>>>>> consumer threads alive? If one of your thread dies, it will eventually
>>>>> block the consumption in other threads.
>>>> 
>>>> Yes. We can see all the threads in the thread dump.
>>>> We have ensured that the threads do not die due to an Exception.
>>>> 
>>>> Please look at the stack trace below. We see all the threads waiting
>> like
>>>> this:
>>>> 
>>>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting on
>>>> condition [0x00007efedae6d000]
>>>>  java.lang.Thread.State: WAITING (parking)
>>>>       at sun.misc.Unsafe.park(Native Method)
>>>>       - parking to wait for  <0x0000000640248618> (a
>>>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>>>       at
>>>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>>>       at
>>>> 
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>>>       at
>>>> 
>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>>>       at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
>>>>       at
>>>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
>>>>       at
>>>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>>>       at
>> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>>>       at
>>>> 
>> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
>>>>       at
>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>>>       at
>>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>>>       at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>>>       at
>>>> 
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>>>       at
>>>> 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>>>       at java.lang.Thread.run(Thread.java:662)
>>>> 
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jun
>>>>> 
>>>>> 
>>>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <np...@sprinklr.com>
>>>> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
>>>>>> 
>>>>>> Our cluster configuration:
>>>>>> 3 brokers on 3 different machines. Each broker machine has a zookeeper
>>>>>> instance running as well.
>>>>>> We have 15 topics defined. We are trying to use them as queue (JMS
>> like)
>>>>>> by defining the same group across different kafka consumers.
>>>>>> On the consumer side, we are using High Level Consumer.
>>>>>> 
>>>>>> However we are seeing a weird behaviour.
>>>>>> One of our heavily used queue (event_queue) has 2 dedicated consumers
>>>>>> listening to that queue only.
>>>>>> This queue is defined with 150 partitions on each broker & the number
>> of
>>>>>> streams defined on the 2 dedicated consumers is 150.
>>>>>> After a while we see that most the consumer threads keep waiting for
>>>>>> events and the lag keeps growing.
>>>>>> If we kill one of the dedicated consumers, then the other consumer
>>>> starts
>>>>>> getting messaging in a hurry.
>>>>>> 
>>>>>> Consumer had no Full GCs.
>>>>>> 
>>>>>> How we measure lag?
>>>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
>>>>>> event_queue --zkconnect
>>>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
>>>> event_queue
>>>>>> 
>>>>>> Around the time, the events stopped coming to the new consumer.. this
>>>> was
>>>>>> printed on the logs:
>>>>>> 
>>>>>> [INFO] zookeeper state changed (Disconnected)
>>>>>> [INFO] zookeeper state changed (Disconnected)
>>>>>> [INFO] zookeeper state changed (SyncConnected)
>>>>>> [INFO] zookeeper state changed (SyncConnected)
>>>>>> 
>>>>>> Config Overidden:
>>>>>> Consumer:
>>>>>> fetch.size=3MB
>>>>>> autooffset.reset=largest
>>>>>> autocommit.interval.ms=500
>>>>>> Producer:
>>>>>> maxMessageSize=3MB
>>>>>> 
>>>>>> Please let us know if we are doing some wrong OR facing some known
>> issue
>>>>>> here?
>>>>>> 
>>>>>> Thanks,
>>>>>> Nihit
>>>> 
>>>> 
>> 
>> 


Re: Kafka consumer not consuming events

Posted by Jun Rao <ju...@gmail.com>.
Ok. One of the issues is that when you have a consumer that consumes
multiple topics, if one of the consumer threads is slow in consuming
messages from one topic, it can block the consumption of other consumer
threads. This is because we use a shared fetcher to fetch all topics. There
is an in-memory queue per topic. If one of the queues is full, the fetcher
will block and can't put the data into other queues.

A workaround is to use different consumer connectors, each consuming a
single topic.

Thanks,

Jun


On Tue, Jul 9, 2013 at 11:12 PM, Nihit Purwar <np...@sprinklr.com> wrote:

> Hi Jun,
>
> Please see my comments inline again :)
>
> On 10-Jul-2013, at 9:13 AM, Jun Rao <ju...@gmail.com> wrote:
>
> > This indicates our in-memory queue is empty. So the consumer thread is
> > blocked.
>
> What should we do about this.
> As I mentioned in the previous mail, events are there to be consumed.
> Killing one consumer makes the other consumer consume events again.
>
>
> > What about the Kafka fetcher threads? Are they blocked on anything?
>
> One of the fetcher threads is blocked on putting to a queue, the other is
> sleeping.
> Please look below:
>
> "FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on
> condition [0x00007fcb833eb000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000006809e8000> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
>         at
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
>         at
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
>         at
> kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
>         at
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
>         at scala.collection.immutable.List.foreach(List.scala:45)
>         at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)
>
> "FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on
> condition [0x00007fcb836ee000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>         at java.lang.Thread.sleep(Native Method)
>         at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)
>
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <np...@sprinklr.com>
> wrote:
> >
> >> Hello Jun,
> >>
> >> Please see my comments inline.
> >>
> >> On 09-Jul-2013, at 8:32 PM, Jun Rao <ju...@gmail.com> wrote:
> >>
> >>> I assume that each consumer instance consumes all 15 topics.
> >> No, we kept dedicated consumer listening to the topic in question.
> >> We did this because this queue processes huge amounts of data.
> >>
> >>
> >>> Are all your
> >>> consumer threads alive? If one of your thread dies, it will eventually
> >>> block the consumption in other threads.
> >>
> >> Yes. We can see all the threads in the thread dump.
> >> We have ensured that the threads do not die due to an Exception.
> >>
> >> Please look at the stack trace below. We see all the threads waiting
> like
> >> this:
> >>
> >> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting on
> >> condition [0x00007efedae6d000]
> >>   java.lang.Thread.State: WAITING (parking)
> >>        at sun.misc.Unsafe.park(Native Method)
> >>        - parking to wait for  <0x0000000640248618> (a
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> >>        at
> >> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
> >>        at
> >>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
> >>        at
> >>
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
> >>        at
> >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
> >>        at
> >> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
> >>        at
> >> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
> >>        at
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
> >>        at
> >>
> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
> >>        at
> >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
> >>        at
> >> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
> >>        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
> >>        at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> >>        at
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> >>        at java.lang.Thread.run(Thread.java:662)
> >>
> >>>
> >>> Thanks,
> >>>
> >>> Jun
> >>>
> >>>
> >>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <np...@sprinklr.com>
> >> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
> >>>>
> >>>> Our cluster configuration:
> >>>> 3 brokers on 3 different machines. Each broker machine has a zookeeper
> >>>> instance running as well.
> >>>> We have 15 topics defined. We are trying to use them as queue (JMS
> like)
> >>>> by defining the same group across different kafka consumers.
> >>>> On the consumer side, we are using High Level Consumer.
> >>>>
> >>>> However we are seeing a weird behaviour.
> >>>> One of our heavily used queue (event_queue) has 2 dedicated consumers
> >>>> listening to that queue only.
> >>>> This queue is defined with 150 partitions on each broker & the number
> of
> >>>> streams defined on the 2 dedicated consumers is 150.
> >>>> After a while we see that most the consumer threads keep waiting for
> >>>> events and the lag keeps growing.
> >>>> If we kill one of the dedicated consumers, then the other consumer
> >> starts
> >>>> getting messaging in a hurry.
> >>>>
> >>>> Consumer had no Full GCs.
> >>>>
> >>>> How we measure lag?
> >>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> >>>> event_queue --zkconnect
> >>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
> >> event_queue
> >>>>
> >>>> Around the time, the events stopped coming to the new consumer.. this
> >> was
> >>>> printed on the logs:
> >>>>
> >>>> [INFO] zookeeper state changed (Disconnected)
> >>>> [INFO] zookeeper state changed (Disconnected)
> >>>> [INFO] zookeeper state changed (SyncConnected)
> >>>> [INFO] zookeeper state changed (SyncConnected)
> >>>>
> >>>> Config Overidden:
> >>>> Consumer:
> >>>> fetch.size=3MB
> >>>> autooffset.reset=largest
> >>>> autocommit.interval.ms=500
> >>>> Producer:
> >>>> maxMessageSize=3MB
> >>>>
> >>>> Please let us know if we are doing some wrong OR facing some known
> issue
> >>>> here?
> >>>>
> >>>> Thanks,
> >>>> Nihit
> >>
> >>
>
>

Re: Kafka consumer not consuming events

Posted by Nihit Purwar <np...@sprinklr.com>.
Hi Jun,

Please see my comments inline again :)

On 10-Jul-2013, at 9:13 AM, Jun Rao <ju...@gmail.com> wrote:

> This indicates our in-memory queue is empty. So the consumer thread is
> blocked.

What should we do about this.
As I mentioned in the previous mail, events are there to be consumed.
Killing one consumer makes the other consumer consume events again.


> What about the Kafka fetcher threads? Are they blocked on anything?

One of the fetcher threads is blocked on putting to a queue, the other is sleeping.
Please look below:

"FetchRunnable-1" prio=10 tid=0x00007fcbc902b800 nid=0x2064 waiting on condition [0x00007fcb833eb000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000006809e8000> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:306)
        at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:61)
        at kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:79)
        at kafka.consumer.FetcherRunnable$$anonfun$run$5.apply(FetcherRunnable.scala:65)
        at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:61)
        at scala.collection.immutable.List.foreach(List.scala:45)
        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:65)

"FetchRunnable-0" prio=10 tid=0x00007fcbc833b800 nid=0x2063 waiting on condition [0x00007fcb836ee000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at kafka.consumer.FetcherRunnable.run(FetcherRunnable.scala:99)

> 
> Thanks,
> 
> Jun
> 
> 
> On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <np...@sprinklr.com> wrote:
> 
>> Hello Jun,
>> 
>> Please see my comments inline.
>> 
>> On 09-Jul-2013, at 8:32 PM, Jun Rao <ju...@gmail.com> wrote:
>> 
>>> I assume that each consumer instance consumes all 15 topics.
>> No, we kept dedicated consumer listening to the topic in question.
>> We did this because this queue processes huge amounts of data.
>> 
>> 
>>> Are all your
>>> consumer threads alive? If one of your thread dies, it will eventually
>>> block the consumption in other threads.
>> 
>> Yes. We can see all the threads in the thread dump.
>> We have ensured that the threads do not die due to an Exception.
>> 
>> Please look at the stack trace below. We see all the threads waiting like
>> this:
>> 
>> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting on
>> condition [0x00007efedae6d000]
>>   java.lang.Thread.State: WAITING (parking)
>>        at sun.misc.Unsafe.park(Native Method)
>>        - parking to wait for  <0x0000000640248618> (a
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>>        at
>> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>>        at
>> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>>        at
>> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>>        at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
>>        at
>> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
>>        at
>> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>>        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>>        at
>> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
>>        at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>>        at
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>>        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>>        at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>>        at java.lang.Thread.run(Thread.java:662)
>> 
>>> 
>>> Thanks,
>>> 
>>> Jun
>>> 
>>> 
>>> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <np...@sprinklr.com>
>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> We are using kafka-0.7.2 with zookeeper (3.4.5)
>>>> 
>>>> Our cluster configuration:
>>>> 3 brokers on 3 different machines. Each broker machine has a zookeeper
>>>> instance running as well.
>>>> We have 15 topics defined. We are trying to use them as queue (JMS like)
>>>> by defining the same group across different kafka consumers.
>>>> On the consumer side, we are using High Level Consumer.
>>>> 
>>>> However we are seeing a weird behaviour.
>>>> One of our heavily used queue (event_queue) has 2 dedicated consumers
>>>> listening to that queue only.
>>>> This queue is defined with 150 partitions on each broker & the number of
>>>> streams defined on the 2 dedicated consumers is 150.
>>>> After a while we see that most the consumer threads keep waiting for
>>>> events and the lag keeps growing.
>>>> If we kill one of the dedicated consumers, then the other consumer
>> starts
>>>> getting messaging in a hurry.
>>>> 
>>>> Consumer had no Full GCs.
>>>> 
>>>> How we measure lag?
>>>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
>>>> event_queue --zkconnect
>>>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
>> event_queue
>>>> 
>>>> Around the time, the events stopped coming to the new consumer.. this
>> was
>>>> printed on the logs:
>>>> 
>>>> [INFO] zookeeper state changed (Disconnected)
>>>> [INFO] zookeeper state changed (Disconnected)
>>>> [INFO] zookeeper state changed (SyncConnected)
>>>> [INFO] zookeeper state changed (SyncConnected)
>>>> 
>>>> Config Overidden:
>>>> Consumer:
>>>> fetch.size=3MB
>>>> autooffset.reset=largest
>>>> autocommit.interval.ms=500
>>>> Producer:
>>>> maxMessageSize=3MB
>>>> 
>>>> Please let us know if we are doing some wrong OR facing some known issue
>>>> here?
>>>> 
>>>> Thanks,
>>>> Nihit
>> 
>> 


Re: Kafka consumer not consuming events

Posted by Jun Rao <ju...@gmail.com>.
This indicates our in-memory queue is empty. So the consumer thread is
blocked. What about the Kafka fetcher threads? Are they blocked on anything?

Thanks,

Jun


On Tue, Jul 9, 2013 at 8:37 AM, Nihit Purwar <np...@sprinklr.com> wrote:

> Hello Jun,
>
> Please see my comments inline.
>
> On 09-Jul-2013, at 8:32 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > I assume that each consumer instance consumes all 15 topics.
> No, we kept dedicated consumer listening to the topic in question.
> We did this because this queue processes huge amounts of data.
>
>
> > Are all your
> > consumer threads alive? If one of your thread dies, it will eventually
> > block the consumption in other threads.
>
> Yes. We can see all the threads in the thread dump.
> We have ensured that the threads do not die due to an Exception.
>
> Please look at the stack trace below. We see all the threads waiting like
> this:
>
> "event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting on
> condition [0x00007efedae6d000]
>    java.lang.Thread.State: WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x0000000640248618> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at
> java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
>         at
> java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
>         at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
>         at
> kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
>         at
> kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
>         at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
>         at
> com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
>         at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
>
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <np...@sprinklr.com>
> wrote:
> >
> >> Hi,
> >>
> >> We are using kafka-0.7.2 with zookeeper (3.4.5)
> >>
> >> Our cluster configuration:
> >> 3 brokers on 3 different machines. Each broker machine has a zookeeper
> >> instance running as well.
> >> We have 15 topics defined. We are trying to use them as queue (JMS like)
> >> by defining the same group across different kafka consumers.
> >> On the consumer side, we are using High Level Consumer.
> >>
> >> However we are seeing a weird behaviour.
> >> One of our heavily used queue (event_queue) has 2 dedicated consumers
> >> listening to that queue only.
> >> This queue is defined with 150 partitions on each broker & the number of
> >> streams defined on the 2 dedicated consumers is 150.
> >> After a while we see that most the consumer threads keep waiting for
> >> events and the lag keeps growing.
> >> If we kill one of the dedicated consumers, then the other consumer
> starts
> >> getting messaging in a hurry.
> >>
> >> Consumer had no Full GCs.
> >>
> >> How we measure lag?
> >> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> >> event_queue --zkconnect
> >> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic
> event_queue
> >>
> >> Around the time, the events stopped coming to the new consumer.. this
> was
> >> printed on the logs:
> >>
> >> [INFO] zookeeper state changed (Disconnected)
> >> [INFO] zookeeper state changed (Disconnected)
> >> [INFO] zookeeper state changed (SyncConnected)
> >> [INFO] zookeeper state changed (SyncConnected)
> >>
> >> Config Overidden:
> >> Consumer:
> >> fetch.size=3MB
> >> autooffset.reset=largest
> >> autocommit.interval.ms=500
> >> Producer:
> >> maxMessageSize=3MB
> >>
> >> Please let us know if we are doing some wrong OR facing some known issue
> >> here?
> >>
> >> Thanks,
> >> Nihit
>
>

Re: Kafka consumer not consuming events

Posted by Nihit Purwar <np...@sprinklr.com>.
Hello Jun,

Please see my comments inline.

On 09-Jul-2013, at 8:32 PM, Jun Rao <ju...@gmail.com> wrote:

> I assume that each consumer instance consumes all 15 topics.
No, we kept dedicated consumer listening to the topic in question.
We did this because this queue processes huge amounts of data.


> Are all your
> consumer threads alive? If one of your thread dies, it will eventually
> block the consumption in other threads.

Yes. We can see all the threads in the thread dump.
We have ensured that the threads do not die due to an Exception.

Please look at the stack trace below. We see all the threads waiting like this:

"event_queue@150" prio=10 tid=0x00007eff28e41800 nid=0x31f9 waiting on condition [0x00007efedae6d000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000640248618> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1987)
        at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:399)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:60)
        at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:32)
        at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:59)
        at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:51)
        at com.spr.messageprocessor.KafkaStreamRunnable.run(KafkaStreamRunnable.java:49)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
        at java.util.concurrent.FutureTask.run(FutureTask.java:138)
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
        at java.lang.Thread.run(Thread.java:662)

> 
> Thanks,
> 
> Jun
> 
> 
> On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <np...@sprinklr.com> wrote:
> 
>> Hi,
>> 
>> We are using kafka-0.7.2 with zookeeper (3.4.5)
>> 
>> Our cluster configuration:
>> 3 brokers on 3 different machines. Each broker machine has a zookeeper
>> instance running as well.
>> We have 15 topics defined. We are trying to use them as queue (JMS like)
>> by defining the same group across different kafka consumers.
>> On the consumer side, we are using High Level Consumer.
>> 
>> However we are seeing a weird behaviour.
>> One of our heavily used queue (event_queue) has 2 dedicated consumers
>> listening to that queue only.
>> This queue is defined with 150 partitions on each broker & the number of
>> streams defined on the 2 dedicated consumers is 150.
>> After a while we see that most the consumer threads keep waiting for
>> events and the lag keeps growing.
>> If we kill one of the dedicated consumers, then the other consumer starts
>> getting messaging in a hurry.
>> 
>> Consumer had no Full GCs.
>> 
>> How we measure lag?
>> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
>> event_queue --zkconnect
>> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic event_queue
>> 
>> Around the time, the events stopped coming to the new consumer.. this was
>> printed on the logs:
>> 
>> [INFO] zookeeper state changed (Disconnected)
>> [INFO] zookeeper state changed (Disconnected)
>> [INFO] zookeeper state changed (SyncConnected)
>> [INFO] zookeeper state changed (SyncConnected)
>> 
>> Config Overidden:
>> Consumer:
>> fetch.size=3MB
>> autooffset.reset=largest
>> autocommit.interval.ms=500
>> Producer:
>> maxMessageSize=3MB
>> 
>> Please let us know if we are doing some wrong OR facing some known issue
>> here?
>> 
>> Thanks,
>> Nihit


Re: Kafka consumer not consuming events

Posted by Jun Rao <ju...@gmail.com>.
I assume that each consumer instance consumes all 15 topics. Are all your
consumer threads alive? If one of your thread dies, it will eventually
block the consumption in other threads.

Thanks,

Jun


On Tue, Jul 9, 2013 at 4:18 AM, Nihit Purwar <np...@sprinklr.com> wrote:

> Hi,
>
> We are using kafka-0.7.2 with zookeeper (3.4.5)
>
> Our cluster configuration:
> 3 brokers on 3 different machines. Each broker machine has a zookeeper
> instance running as well.
> We have 15 topics defined. We are trying to use them as queue (JMS like)
> by defining the same group across different kafka consumers.
> On the consumer side, we are using High Level Consumer.
>
> However we are seeing a weird behaviour.
> One of our heavily used queue (event_queue) has 2 dedicated consumers
> listening to that queue only.
> This queue is defined with 150 partitions on each broker & the number of
> streams defined on the 2 dedicated consumers is 150.
> After a while we see that most the consumer threads keep waiting for
> events and the lag keeps growing.
> If we kill one of the dedicated consumers, then the other consumer starts
> getting messaging in a hurry.
>
> Consumer had no Full GCs.
>
> How we measure lag?
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group
> event_queue --zkconnect
> zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka --topic event_queue
>
> Around the time, the events stopped coming to the new consumer.. this was
> printed on the logs:
>
> [INFO] zookeeper state changed (Disconnected)
> [INFO] zookeeper state changed (Disconnected)
> [INFO] zookeeper state changed (SyncConnected)
> [INFO] zookeeper state changed (SyncConnected)
>
> Config Overidden:
> Consumer:
> fetch.size=3MB
> autooffset.reset=largest
> autocommit.interval.ms=500
> Producer:
> maxMessageSize=3MB
>
> Please let us know if we are doing some wrong OR facing some known issue
> here?
>
> Thanks,
> Nihit