You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Jaikiran Pai <ja...@gmail.com> on 2016/11/02 14:14:56 UTC

0.10.1.0 - KafkaConsumer.poll() blocks background heartbeat thread causing consumer to be considered dead?

We have been trying to narrow down an issue in 0.10.1 of Kafka in our 
setups where our consumers are marked as dead very frequently causing 
rebalances almost every few seconds. The consumer (Java new API) then 
starts seeing exceptions like:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot 
be completed since the group has already rebalanced and assigned the 
partitions to another member. This means that the time between 
subsequent calls to poll() was longer than the configured 
max.poll.interval.ms, which typically implies that the poll loop is 
spending too much time message processing. You can address this either 
by increasing the session timeout or by reducing the maximum size of 
batches returned in poll() with max.poll.records.
     at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674) 
~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615) 
~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742) 
~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722) 
~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186) 
~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149) 
~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116) 
~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479) 
~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:316) 
~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:256) 
~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:180) 
~[kafka-clients-0.10.1.0.jar!/:na]
     at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:499) 
~[kafka-clients-0.10.1.0.jar!/:na]


Our session and heartbeat timeouts are defaults that ship in Kafka 
0.10.1 (i.e. we don't set any specific values). Every few seconds, we 
see messages on the broker logs which indicate these consumers are 
considered dead:

[2016-11-02 06:09:48,103] TRACE [GroupCoordinator 0]: Member 
consumer-1-efde1e11-fdc6-4801-8fba-20d58b7a30b6 in group foo-bar has 
failed (kafka.coordinator.GroupCoordinator)
[2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Preparing to 
restabilize group foo-bar with old generation 1034 
(kafka.coordinator.GroupCoordinator)
[2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Group foo-bar with 
generation 1035 is now empty (kafka.coordinator.GroupCoordinator)
....

These messages keep repeating for almost every other consumer we have 
(in different groups).

There's no real logic in our consumers and they just pick up the message 
from partitions, commit the offset, and hand it immediately to a 
different thread to process the message and go back to polling:

        while (!stopped) {
                 try {
                     final ConsumerRecords<K, V> consumerRecords = 
consumer.poll(someValue);
                     for (final TopicPartition topicPartition : 
consumerRecords.partitions()) {
                         if (stopped) {
                             break;
                         }
                         for (final ConsumerRecord<K, V> consumerRecord 
: consumerRecords.records(topicPartition)) {
                             final long previousOffset = 
consumerRecord.offset();
                             // commit the offset and then pass on the 
message for processing (in a separate thread)
consumer.commitSync(Collections.singletonMap(topicPartition, new 
OffsetAndMetadata(previousOffset + 1)));

                             this.executor.execute(new Runnable() {
                                 @Override
                                 public void run() {
                                     // process the ConsumerRecord
                                 }
                             });
                         }
                     }
                 } catch (Exception e) {
                     // log the error and continue
                     continue;
                 }
             }



We haven't been able to figure out why the heartbeats wouldn't be sent 
by the consumer in the expected time period. From my understanding of 
the docs, the heartbeats are sent in the background thread for the 
consumer, so there should be no real reason why these wouldn't be sent.


We debugged this a bit further and got some thread dumps from the JVM of 
the consumers and here's what we see:

"*kafka-coordinator-heartbeat-thread* | foo-bar #28 daemon prio=5 
os_prio=0 tid=0x00007f0d7c0ee000 nid=0x2e waiting for monitor entry 
[0x00007f0dd54c7000]
    java.lang.Thread.State: *BLOCKED* (on object monitor)
     at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.disableWakeups(ConsumerNetworkClient.java:409)
     - *waiting to lock <0x00000000c0962bb0>* (a 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
     at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:264)
     at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:864)
     - locked <0x00000000c0962578> (a 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

So it looks like the heartbeat thread is *blocked*  waiting for a object 
lock and that lock is held by:


"thread-1" #27 daemon prio=5 os_prio=0 tid=0x00007f0dec3c1800 nid=0x27 
runnable [0x00007f0dcdffc000]
    java.lang.Thread.State: RUNNABLE
     at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
     at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
     at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
     - locked <0x00000000c063b820> (a sun.nio.ch.Util$3)
     - locked <0x00000000c063b810> (a java.util.Collections$UnmodifiableSet)
     - locked <0x00000000c05f9a70> (a sun.nio.ch.EPollSelectorImpl)
     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
     at org.apache.kafka.common.network.Selector.select(Selector.java:470)
     at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
     at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
     - *locked* <*0x00000000c0962bb0*> (a 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
     at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1031)
     at 
org.apache.kafka.clients.consumer.*KafkaConsumer*.*poll*(KafkaConsumer.java:979)
     at org.myapp.KafkaMessageReceiver.start(KafkaMessageReceiver.java:72)


So it looks like the consumer code which invokes the 
*KafkaConsumer.poll*(...) API to fetch the messages is blocking the 
heartbeat sender thread? Is this intentional? If so, wouldn't this delay 
the heartbeats being sent and cause the heartbeat task on the 
coordinator to expire as per this logic is see on the coordinator:

   private def shouldKeepMemberAlive(member: MemberMetadata, 
heartbeatDeadline: Long) =
     member.awaitingJoinCallback != null ||
       member.awaitingSyncCallback != null ||
*      member.latestHeartbeat + member.sessionTimeoutMs > 
heartbeatDeadline**
*
from what I see and my limited understanding of this code, this would 
mark the member dead (as seen in the logs).


Is this expected that the background heart beat sender thread would be 
blocked by poll on the consumer (*our poll timeout is 2 minutes*)? Or 
did I misread these logs and stacktraces? Let me know if more 
logs/details are needed and I can get them.


-Jaikiran


Re: 0.10.1.0 - KafkaConsumer.poll() blocks background heartbeat thread causing consumer to be considered dead?

Posted by Jason Gustafson <ja...@confluent.io>.
Hey Jaikiran,

Thanks for the report. We shouldn't hold the lock in poll() longer than the
heartbeat interval, so if you're seeing this, it's probably a bug. Let me
see if I can reproduce it. One quick question about the code snippet. What
kind of executor are you using? Is the load on the topic small enough that
you never have to worry about the executor's queue filling up, or do you
have some approach for back pressure? If so, can you describe it?

Thanks,
Jason

On Wed, Nov 2, 2016 at 8:22 AM, Jaikiran Pai <ja...@gmail.com>
wrote:

> Thanks Ismael. Just checked, that one doesn't look like it's the same
> issue, but could be a similar one. In that JIRA it looks like the issue was
> probably addressed for the commitSync call. However, in this specific
> instance the KafkaConsumer.poll(...) itself leads to locking the object
> monitor of on the ConsumerNetworkClient. The heart beat thread in the
> background seems to be waiting to get hold of that object monitor and
> blocks on it.
>
> If I keep aside the implementation details, what is the expected semantics
> with heart beat background thread - would it fail to send a heartbeat for a
> consumer if the consumer is currently busy with poll(), commitSync() or any
> similar call? If so, would this lack of heartbeat being sent (for a while)
> cause that member to be considered dead by the co-ordinator. My reading of
> the logs and the limited knowledge of Kafka code seems to indicate that
> this is what's happening, either as per expected semantics or a possible
> bug.
>
> -Jaikiran
>
>
> On Wednesday 02 November 2016 08:39 PM, Ismael Juma wrote:
>
>> Maybe https://issues.apache.org/jira/browse/KAFKA-4303?
>>
>> On 2 Nov 2016 10:15 am, "Jaikiran Pai" <ja...@gmail.com> wrote:
>>
>> We have been trying to narrow down an issue in 0.10.1 of Kafka in our
>>> setups where our consumers are marked as dead very frequently causing
>>> rebalances almost every few seconds. The consumer (Java new API) then
>>> starts seeing exceptions like:
>>>
>>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot
>>> be
>>> completed since the group has already rebalanced and assigned the
>>> partitions to another member. This means that the time between subsequent
>>> calls to poll() was longer than the configured max.poll.interval.ms,
>>> which typically implies that the poll loop is spending too much time
>>> message processing. You can address this either by increasing the session
>>> timeout or by reducing the maximum size of batches returned in poll()
>>> with
>>> max.poll.records.
>>>      at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674)
>>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>>      at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
>>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>>      at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>>> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>>      at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>>> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>>      at org.apache.kafka.clients.consumer.internals.RequestFuture$1.
>>> onSuccess(RequestFuture.java:186) ~[kafka-clients-0.10.1.0.jar!/:na]
>>>      at org.apache.kafka.clients.consumer.internals.RequestFuture.
>>> fireSuccess(RequestFuture.java:149) ~[kafka-clients-0.10.1.0.jar!/:na]
>>>      at org.apache.kafka.clients.consumer.internals.RequestFuture.
>>> complete(RequestFuture.java:116) ~[kafka-clients-0.10.1.0.jar!/:na]
>>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>> lient$RequestFutureCompletionHandler.fireCompletion(Consumer
>>> NetworkClient.java:479)
>>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>> lient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>> lient.poll(ConsumerNetworkClient.java:256)
>>> ~[kafka-clients-0.10.1.0.jar!/
>>> :na]
>>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>> lient.poll(ConsumerNetworkClient.java:180)
>>> ~[kafka-clients-0.10.1.0.jar!/
>>> :na]
>>>      at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>>> tor.commitOffsetsSync(ConsumerCoordinator.java:499)
>>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>>
>>>
>>> Our session and heartbeat timeouts are defaults that ship in Kafka 0.10.1
>>> (i.e. we don't set any specific values). Every few seconds, we see
>>> messages
>>> on the broker logs which indicate these consumers are considered dead:
>>>
>>> [2016-11-02 06:09:48,103] TRACE [GroupCoordinator 0]: Member
>>> consumer-1-efde1e11-fdc6-4801-8fba-20d58b7a30b6 in group foo-bar has
>>> failed (kafka.coordinator.GroupCoordinator)
>>> [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Preparing to
>>> restabilize group foo-bar with old generation 1034
>>> (kafka.coordinator.GroupCoordinator)
>>> [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Group foo-bar with
>>> generation 1035 is now empty (kafka.coordinator.GroupCoordinator)
>>> ....
>>>
>>> These messages keep repeating for almost every other consumer we have (in
>>> different groups).
>>>
>>> There's no real logic in our consumers and they just pick up the message
>>> from partitions, commit the offset, and hand it immediately to a
>>> different
>>> thread to process the message and go back to polling:
>>>
>>>         while (!stopped) {
>>>                  try {
>>>                      final ConsumerRecords<K, V> consumerRecords =
>>> consumer.poll(someValue);
>>>                      for (final TopicPartition topicPartition :
>>> consumerRecords.partitions()) {
>>>                          if (stopped) {
>>>                              break;
>>>                          }
>>>                          for (final ConsumerRecord<K, V> consumerRecord :
>>> consumerRecords.records(topicPartition)) {
>>>                              final long previousOffset =
>>> consumerRecord.offset();
>>>                              // commit the offset and then pass on the
>>> message for processing (in a separate thread)
>>> consumer.commitSync(Collections.singletonMap(topicPartition, new
>>> OffsetAndMetadata(previousOffset + 1)));
>>>
>>>                              this.executor.execute(new Runnable() {
>>>                                  @Override
>>>                                  public void run() {
>>>                                      // process the ConsumerRecord
>>>                                  }
>>>                              });
>>>                          }
>>>                      }
>>>                  } catch (Exception e) {
>>>                      // log the error and continue
>>>                      continue;
>>>                  }
>>>              }
>>>
>>>
>>>
>>> We haven't been able to figure out why the heartbeats wouldn't be sent by
>>> the consumer in the expected time period. From my understanding of the
>>> docs, the heartbeats are sent in the background thread for the consumer,
>>> so
>>> there should be no real reason why these wouldn't be sent.
>>>
>>>
>>> We debugged this a bit further and got some thread dumps from the JVM of
>>> the consumers and here's what we see:
>>>
>>> "*kafka-coordinator-heartbeat-thread* | foo-bar #28 daemon prio=5
>>> os_prio=0 tid=0x00007f0d7c0ee000 nid=0x2e waiting for monitor entry
>>> [0x00007f0dd54c7000]
>>>     java.lang.Thread.State: *BLOCKED* (on object monitor)
>>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>> lient.disableWakeups(ConsumerNetworkClient.java:409)
>>>      - *waiting to lock <0x00000000c0962bb0>* (a
>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
>>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>> lient.pollNoWakeup(ConsumerNetworkClient.java:264)
>>>      at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>>> tor$HeartbeatThread.run(AbstractCoordinator.java:864)
>>>      - locked <0x00000000c0962578> (a org.apache.kafka.clients.consu
>>> mer.internals.ConsumerCoordinator)
>>>
>>> So it looks like the heartbeat thread is *blocked*  waiting for a object
>>> lock and that lock is held by:
>>>
>>>
>>> "thread-1" #27 daemon prio=5 os_prio=0 tid=0x00007f0dec3c1800 nid=0x27
>>> runnable [0x00007f0dcdffc000]
>>>     java.lang.Thread.State: RUNNABLE
>>>      at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>>      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>>      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>>>      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>>>      - locked <0x00000000c063b820> (a sun.nio.ch.Util$3)
>>>      - locked <0x00000000c063b810> (a java.util.Collections$Unmodifi
>>> ableSet)
>>>      - locked <0x00000000c05f9a70> (a sun.nio.ch.EPollSelectorImpl)
>>>      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>>      at org.apache.kafka.common.network.Selector.select(Selector.
>>> java:470)
>>>      at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>>>      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.
>>> java:260)
>>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>>> lient.poll(ConsumerNetworkClient.java:232)
>>>      - *locked* <*0x00000000c0962bb0*> (a org.apache.kafka.clients.consu
>>> mer.internals.ConsumerNetworkClient)
>>>      at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
>>> KafkaConsumer.java:1031)
>>>      at org.apache.kafka.clients.consumer.*KafkaConsumer*.*poll*(
>>> KafkaConsumer.java:979)
>>>      at org.myapp.KafkaMessageReceiver.start(KafkaMessageReceiver.
>>> java:72)
>>>
>>>
>>> So it looks like the consumer code which invokes the
>>> *KafkaConsumer.poll*(...) API to fetch the messages is blocking the
>>> heartbeat sender thread? Is this intentional? If so, wouldn't this delay
>>> the heartbeats being sent and cause the heartbeat task on the coordinator
>>> to expire as per this logic is see on the coordinator:
>>>
>>>    private def shouldKeepMemberAlive(member: MemberMetadata,
>>> heartbeatDeadline: Long) =
>>>      member.awaitingJoinCallback != null ||
>>>        member.awaitingSyncCallback != null ||
>>> *      member.latestHeartbeat + member.sessionTimeoutMs >
>>> heartbeatDeadline**
>>> *
>>> from what I see and my limited understanding of this code, this would
>>> mark
>>> the member dead (as seen in the logs).
>>>
>>>
>>> Is this expected that the background heart beat sender thread would be
>>> blocked by poll on the consumer (*our poll timeout is 2 minutes*)? Or
>>> did I
>>> misread these logs and stacktraces? Let me know if more logs/details are
>>> needed and I can get them.
>>>
>>>
>>> -Jaikiran
>>>
>>>
>>>
>

Re: 0.10.1.0 - KafkaConsumer.poll() blocks background heartbeat thread causing consumer to be considered dead?

Posted by Jaikiran Pai <ja...@gmail.com>.
Thanks Ismael. Just checked, that one doesn't look like it's the same 
issue, but could be a similar one. In that JIRA it looks like the issue 
was probably addressed for the commitSync call. However, in this 
specific instance the KafkaConsumer.poll(...) itself leads to locking 
the object monitor of on the ConsumerNetworkClient. The heart beat 
thread in the background seems to be waiting to get hold of that object 
monitor and blocks on it.

If I keep aside the implementation details, what is the expected 
semantics with heart beat background thread - would it fail to send a 
heartbeat for a consumer if the consumer is currently busy with poll(), 
commitSync() or any similar call? If so, would this lack of heartbeat 
being sent (for a while) cause that member to be considered dead by the 
co-ordinator. My reading of the logs and the limited knowledge of Kafka 
code seems to indicate that this is what's happening, either as per 
expected semantics or a possible bug.

-Jaikiran

On Wednesday 02 November 2016 08:39 PM, Ismael Juma wrote:
> Maybe https://issues.apache.org/jira/browse/KAFKA-4303?
>
> On 2 Nov 2016 10:15 am, "Jaikiran Pai" <ja...@gmail.com> wrote:
>
>> We have been trying to narrow down an issue in 0.10.1 of Kafka in our
>> setups where our consumers are marked as dead very frequently causing
>> rebalances almost every few seconds. The consumer (Java new API) then
>> starts seeing exceptions like:
>>
>> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
>> completed since the group has already rebalanced and assigned the
>> partitions to another member. This means that the time between subsequent
>> calls to poll() was longer than the configured max.poll.interval.ms,
>> which typically implies that the poll loop is spending too much time
>> message processing. You can address this either by increasing the session
>> timeout or by reducing the maximum size of batches returned in poll() with
>> max.poll.records.
>>      at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674)
>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>      at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>      at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>      at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>      at org.apache.kafka.clients.consumer.internals.RequestFuture$1.
>> onSuccess(RequestFuture.java:186) ~[kafka-clients-0.10.1.0.jar!/:na]
>>      at org.apache.kafka.clients.consumer.internals.RequestFuture.
>> fireSuccess(RequestFuture.java:149) ~[kafka-clients-0.10.1.0.jar!/:na]
>>      at org.apache.kafka.clients.consumer.internals.RequestFuture.
>> complete(RequestFuture.java:116) ~[kafka-clients-0.10.1.0.jar!/:na]
>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.poll(ConsumerNetworkClient.java:256) ~[kafka-clients-0.10.1.0.jar!/
>> :na]
>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.poll(ConsumerNetworkClient.java:180) ~[kafka-clients-0.10.1.0.jar!/
>> :na]
>>      at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
>> tor.commitOffsetsSync(ConsumerCoordinator.java:499)
>> ~[kafka-clients-0.10.1.0.jar!/:na]
>>
>>
>> Our session and heartbeat timeouts are defaults that ship in Kafka 0.10.1
>> (i.e. we don't set any specific values). Every few seconds, we see messages
>> on the broker logs which indicate these consumers are considered dead:
>>
>> [2016-11-02 06:09:48,103] TRACE [GroupCoordinator 0]: Member
>> consumer-1-efde1e11-fdc6-4801-8fba-20d58b7a30b6 in group foo-bar has
>> failed (kafka.coordinator.GroupCoordinator)
>> [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Preparing to
>> restabilize group foo-bar with old generation 1034
>> (kafka.coordinator.GroupCoordinator)
>> [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Group foo-bar with
>> generation 1035 is now empty (kafka.coordinator.GroupCoordinator)
>> ....
>>
>> These messages keep repeating for almost every other consumer we have (in
>> different groups).
>>
>> There's no real logic in our consumers and they just pick up the message
>> from partitions, commit the offset, and hand it immediately to a different
>> thread to process the message and go back to polling:
>>
>>         while (!stopped) {
>>                  try {
>>                      final ConsumerRecords<K, V> consumerRecords =
>> consumer.poll(someValue);
>>                      for (final TopicPartition topicPartition :
>> consumerRecords.partitions()) {
>>                          if (stopped) {
>>                              break;
>>                          }
>>                          for (final ConsumerRecord<K, V> consumerRecord :
>> consumerRecords.records(topicPartition)) {
>>                              final long previousOffset =
>> consumerRecord.offset();
>>                              // commit the offset and then pass on the
>> message for processing (in a separate thread)
>> consumer.commitSync(Collections.singletonMap(topicPartition, new
>> OffsetAndMetadata(previousOffset + 1)));
>>
>>                              this.executor.execute(new Runnable() {
>>                                  @Override
>>                                  public void run() {
>>                                      // process the ConsumerRecord
>>                                  }
>>                              });
>>                          }
>>                      }
>>                  } catch (Exception e) {
>>                      // log the error and continue
>>                      continue;
>>                  }
>>              }
>>
>>
>>
>> We haven't been able to figure out why the heartbeats wouldn't be sent by
>> the consumer in the expected time period. From my understanding of the
>> docs, the heartbeats are sent in the background thread for the consumer, so
>> there should be no real reason why these wouldn't be sent.
>>
>>
>> We debugged this a bit further and got some thread dumps from the JVM of
>> the consumers and here's what we see:
>>
>> "*kafka-coordinator-heartbeat-thread* | foo-bar #28 daemon prio=5
>> os_prio=0 tid=0x00007f0d7c0ee000 nid=0x2e waiting for monitor entry
>> [0x00007f0dd54c7000]
>>     java.lang.Thread.State: *BLOCKED* (on object monitor)
>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.disableWakeups(ConsumerNetworkClient.java:409)
>>      - *waiting to lock <0x00000000c0962bb0>* (a
>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.pollNoWakeup(ConsumerNetworkClient.java:264)
>>      at org.apache.kafka.clients.consumer.internals.AbstractCoordina
>> tor$HeartbeatThread.run(AbstractCoordinator.java:864)
>>      - locked <0x00000000c0962578> (a org.apache.kafka.clients.consu
>> mer.internals.ConsumerCoordinator)
>>
>> So it looks like the heartbeat thread is *blocked*  waiting for a object
>> lock and that lock is held by:
>>
>>
>> "thread-1" #27 daemon prio=5 os_prio=0 tid=0x00007f0dec3c1800 nid=0x27
>> runnable [0x00007f0dcdffc000]
>>     java.lang.Thread.State: RUNNABLE
>>      at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>>      at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>>      at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>>      at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>>      - locked <0x00000000c063b820> (a sun.nio.ch.Util$3)
>>      - locked <0x00000000c063b810> (a java.util.Collections$Unmodifi
>> ableSet)
>>      - locked <0x00000000c05f9a70> (a sun.nio.ch.EPollSelectorImpl)
>>      at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>>      at org.apache.kafka.common.network.Selector.select(Selector.java:470)
>>      at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>>      at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>>      at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
>> lient.poll(ConsumerNetworkClient.java:232)
>>      - *locked* <*0x00000000c0962bb0*> (a org.apache.kafka.clients.consu
>> mer.internals.ConsumerNetworkClient)
>>      at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
>> KafkaConsumer.java:1031)
>>      at org.apache.kafka.clients.consumer.*KafkaConsumer*.*poll*(
>> KafkaConsumer.java:979)
>>      at org.myapp.KafkaMessageReceiver.start(KafkaMessageReceiver.java:72)
>>
>>
>> So it looks like the consumer code which invokes the
>> *KafkaConsumer.poll*(...) API to fetch the messages is blocking the
>> heartbeat sender thread? Is this intentional? If so, wouldn't this delay
>> the heartbeats being sent and cause the heartbeat task on the coordinator
>> to expire as per this logic is see on the coordinator:
>>
>>    private def shouldKeepMemberAlive(member: MemberMetadata,
>> heartbeatDeadline: Long) =
>>      member.awaitingJoinCallback != null ||
>>        member.awaitingSyncCallback != null ||
>> *      member.latestHeartbeat + member.sessionTimeoutMs >
>> heartbeatDeadline**
>> *
>> from what I see and my limited understanding of this code, this would mark
>> the member dead (as seen in the logs).
>>
>>
>> Is this expected that the background heart beat sender thread would be
>> blocked by poll on the consumer (*our poll timeout is 2 minutes*)? Or did I
>> misread these logs and stacktraces? Let me know if more logs/details are
>> needed and I can get them.
>>
>>
>> -Jaikiran
>>
>>


Re: 0.10.1.0 - KafkaConsumer.poll() blocks background heartbeat thread causing consumer to be considered dead?

Posted by Ismael Juma <is...@gmail.com>.
Maybe https://issues.apache.org/jira/browse/KAFKA-4303?

On 2 Nov 2016 10:15 am, "Jaikiran Pai" <ja...@gmail.com> wrote:

> We have been trying to narrow down an issue in 0.10.1 of Kafka in our
> setups where our consumers are marked as dead very frequently causing
> rebalances almost every few seconds. The consumer (Java new API) then
> starts seeing exceptions like:
>
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be
> completed since the group has already rebalanced and assigned the
> partitions to another member. This means that the time between subsequent
> calls to poll() was longer than the configured max.poll.interval.ms,
> which typically implies that the poll loop is spending too much time
> message processing. You can address this either by increasing the session
> timeout or by reducing the maximum size of batches returned in poll() with
> max.poll.records.
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:674)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:615)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:742)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:722)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.RequestFuture$1.
> onSuccess(RequestFuture.java:186) ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.RequestFuture.
> fireSuccess(RequestFuture.java:149) ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.RequestFuture.
> complete(RequestFuture.java:116) ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:479)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.firePendingCompletedRequests(ConsumerNetworkClient.java:316)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:256) ~[kafka-clients-0.10.1.0.jar!/
> :na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:180) ~[kafka-clients-0.10.1.0.jar!/
> :na]
>     at org.apache.kafka.clients.consumer.internals.ConsumerCoordina
> tor.commitOffsetsSync(ConsumerCoordinator.java:499)
> ~[kafka-clients-0.10.1.0.jar!/:na]
>
>
> Our session and heartbeat timeouts are defaults that ship in Kafka 0.10.1
> (i.e. we don't set any specific values). Every few seconds, we see messages
> on the broker logs which indicate these consumers are considered dead:
>
> [2016-11-02 06:09:48,103] TRACE [GroupCoordinator 0]: Member
> consumer-1-efde1e11-fdc6-4801-8fba-20d58b7a30b6 in group foo-bar has
> failed (kafka.coordinator.GroupCoordinator)
> [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Preparing to
> restabilize group foo-bar with old generation 1034
> (kafka.coordinator.GroupCoordinator)
> [2016-11-02 06:09:48,103] INFO [GroupCoordinator 0]: Group foo-bar with
> generation 1035 is now empty (kafka.coordinator.GroupCoordinator)
> ....
>
> These messages keep repeating for almost every other consumer we have (in
> different groups).
>
> There's no real logic in our consumers and they just pick up the message
> from partitions, commit the offset, and hand it immediately to a different
> thread to process the message and go back to polling:
>
>        while (!stopped) {
>                 try {
>                     final ConsumerRecords<K, V> consumerRecords =
> consumer.poll(someValue);
>                     for (final TopicPartition topicPartition :
> consumerRecords.partitions()) {
>                         if (stopped) {
>                             break;
>                         }
>                         for (final ConsumerRecord<K, V> consumerRecord :
> consumerRecords.records(topicPartition)) {
>                             final long previousOffset =
> consumerRecord.offset();
>                             // commit the offset and then pass on the
> message for processing (in a separate thread)
> consumer.commitSync(Collections.singletonMap(topicPartition, new
> OffsetAndMetadata(previousOffset + 1)));
>
>                             this.executor.execute(new Runnable() {
>                                 @Override
>                                 public void run() {
>                                     // process the ConsumerRecord
>                                 }
>                             });
>                         }
>                     }
>                 } catch (Exception e) {
>                     // log the error and continue
>                     continue;
>                 }
>             }
>
>
>
> We haven't been able to figure out why the heartbeats wouldn't be sent by
> the consumer in the expected time period. From my understanding of the
> docs, the heartbeats are sent in the background thread for the consumer, so
> there should be no real reason why these wouldn't be sent.
>
>
> We debugged this a bit further and got some thread dumps from the JVM of
> the consumers and here's what we see:
>
> "*kafka-coordinator-heartbeat-thread* | foo-bar #28 daemon prio=5
> os_prio=0 tid=0x00007f0d7c0ee000 nid=0x2e waiting for monitor entry
> [0x00007f0dd54c7000]
>    java.lang.Thread.State: *BLOCKED* (on object monitor)
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.disableWakeups(ConsumerNetworkClient.java:409)
>     - *waiting to lock <0x00000000c0962bb0>* (a
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.pollNoWakeup(ConsumerNetworkClient.java:264)
>     at org.apache.kafka.clients.consumer.internals.AbstractCoordina
> tor$HeartbeatThread.run(AbstractCoordinator.java:864)
>     - locked <0x00000000c0962578> (a org.apache.kafka.clients.consu
> mer.internals.ConsumerCoordinator)
>
> So it looks like the heartbeat thread is *blocked*  waiting for a object
> lock and that lock is held by:
>
>
> "thread-1" #27 daemon prio=5 os_prio=0 tid=0x00007f0dec3c1800 nid=0x27
> runnable [0x00007f0dcdffc000]
>    java.lang.Thread.State: RUNNABLE
>     at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
>     at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
>     at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
>     at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
>     - locked <0x00000000c063b820> (a sun.nio.ch.Util$3)
>     - locked <0x00000000c063b810> (a java.util.Collections$Unmodifi
> ableSet)
>     - locked <0x00000000c05f9a70> (a sun.nio.ch.EPollSelectorImpl)
>     at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
>     at org.apache.kafka.common.network.Selector.select(Selector.java:470)
>     at org.apache.kafka.common.network.Selector.poll(Selector.java:286)
>     at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
>     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkC
> lient.poll(ConsumerNetworkClient.java:232)
>     - *locked* <*0x00000000c0962bb0*> (a org.apache.kafka.clients.consu
> mer.internals.ConsumerNetworkClient)
>     at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(
> KafkaConsumer.java:1031)
>     at org.apache.kafka.clients.consumer.*KafkaConsumer*.*poll*(
> KafkaConsumer.java:979)
>     at org.myapp.KafkaMessageReceiver.start(KafkaMessageReceiver.java:72)
>
>
> So it looks like the consumer code which invokes the
> *KafkaConsumer.poll*(...) API to fetch the messages is blocking the
> heartbeat sender thread? Is this intentional? If so, wouldn't this delay
> the heartbeats being sent and cause the heartbeat task on the coordinator
> to expire as per this logic is see on the coordinator:
>
>   private def shouldKeepMemberAlive(member: MemberMetadata,
> heartbeatDeadline: Long) =
>     member.awaitingJoinCallback != null ||
>       member.awaitingSyncCallback != null ||
> *      member.latestHeartbeat + member.sessionTimeoutMs >
> heartbeatDeadline**
> *
> from what I see and my limited understanding of this code, this would mark
> the member dead (as seen in the logs).
>
>
> Is this expected that the background heart beat sender thread would be
> blocked by poll on the consumer (*our poll timeout is 2 minutes*)? Or did I
> misread these logs and stacktraces? Let me know if more logs/details are
> needed and I can get them.
>
>
> -Jaikiran
>
>