You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Peter Davis (Jira)" <ji...@apache.org> on 2020/09/04 16:08:00 UTC

[jira] [Comment Edited] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

    [ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17190802#comment-17190802 ] 

Peter Davis edited comment on KAFKA-10134 at 9/4/20, 4:07 PM:
--------------------------------------------------------------

I'd like to see the title of this bug clarified: It is worse than just "High CPU usage".  I have a couple of Kafka Streams apps with a high number of tasks/threads and this issue is causing infinite rebalance loops where the entire *cluster stops processing and cannot successfully rebalance*.  This causes hard downtime.  I've had to roll back to 2.4.1.

Edit: clarification: tested with 2.6.0.  Have not tested 2.5.

I'm currently working on building the patch, will test.

 

Late to the party since you've already got the fix in progress, but in case it helps, I'd like to share what I'm seeing:

The rebalance failures seems to be associated the TimeoutExceptions, DisconnectionExceptions and other side effects as noted in earlier comments.  When many StreamThreads are all spinning, then each time a rebalance is attempted, when there are a large number of threads it is likely that _some_ thread will fail, and the rebalance never succeeds.  The downward spiral begins as ConsumerThreads become "fenced" and it triggers a full (not incremental) rebalance, and eventually all data flow gets blocked.  I've tried different combinations of session.timeout.ms, rebalance.timeout.ms, max.poll.time.ms, default.api.timeout.ms (as recommended in the text of the timeout exceptions) to no avail.

Of my applications, the ones that are affected include
 * one stateless app with num.stream.threads=24.  With more than 1 instance (2-4x=48-96 threads), it will often never rebalance correctly, or only after multiple attempts (30+ minutes).  
 * one stateful app with 36 partitions of large-ish (500MB-1GB each) state stores which can take a while to restore.  This app successfully starts if I shut down all instances, delete state stores, set initial rebalance delay, and start all up simultaneously – but if any instance restarts or I attempt to scale up later, then rebalance will never succeed.  Additionally, when state stores are reassigned, there are "LockExceptions" (DEBUG level logs) in a tight loop, and the state stores fail to be closed cleanly, which forces the restore process to begin all over again.  The only way I can successfully do a rolling restart is if I use static membership and increase the session timeout.  If there is only a single instance of the app, then it works with no problems (but this is not a solution as I need multiple instances for scale).

Other side effects: the tight loop logs several DEBUG logs, which filled up log storage and caused pod evictions, which caused state stores to become invalid and restore (workaround: disable this logging).

Additionally, have seen the following exceptions sporadically, not sure if these are separate bugs:

{{2020-08-31T00:40:47.786Z ERROR Uncaught stream processing error! KafkaStreamsConfiguration java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0) , this is not expected; it is possible that the leader's assign function is buggy and did not return any assignment for this member, or *because static member is configured and the protocol is buggy* hence did not get the assignment for this member}}
 {{    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)}}
 {{    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)}}
 {{    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)}}
 {{    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)}}
 {{    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)}}
 {{    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)}}
 {{    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)}}
 {{    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)}}
 {{    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)}}
 {{    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)}}
 {{    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)}}

{{2020-09-03T15:53:17.524Z ERROR Uncaught stream processing error! KafkaStreamsConfiguration java.lang.IllegalStateException: Active task 3_0 should have been suspended}}
 {{    at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:281)}}
 {{    ... 13 common frames omitted}}
 {{Wrapped by: java.lang.RuntimeException: Unexpected failure to close 1 task(s) [[3_0]]. First unexpected exception (for task 3_0) follows.}}
 {{    at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:349)}}
 {{    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428)}}
 {{    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)}}
 {{    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)}}
 {{    ... 10 common frames omitted}}
 {{Wrapped by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error}}
 {{    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)}}
 {{    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)}}
 {{    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)}}
 {{    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)}}
 {{    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)}}
 {{    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)}}
 {{    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)}}
 {{    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)}}
 {{    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:628)}}
 {{    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)}}
 {{    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)}}


was (Author: davispw):
I'd like to see the title of this bug clarified: It is worse than just "High CPU usage".  I have a couple of Kafka Streams apps with a high number of tasks/threads and this issue is causing infinite rebalance loops where the entire *cluster stops processing and cannot successfully rebalance*.  This causes hard downtime.  I've had to roll back to 2.4.1.

I'm currently working on building the patch, will test.

 

Late to the party since you've already got the fix in progress, but in case it helps, I'd like to share what I'm seeing:

The rebalance failures seems to be associated the TimeoutExceptions, DisconnectionExceptions and other side effects as noted in earlier comments.  When many StreamThreads are all spinning, then each time a rebalance is attempted, when there are a large number of threads it is likely that _some_ thread will fail, and the rebalance never succeeds.  The downward spiral begins as ConsumerThreads become "fenced" and it triggers a full (not incremental) rebalance, and eventually all data flow gets blocked.  I've tried different combinations of session.timeout.ms, rebalance.timeout.ms, max.poll.time.ms, default.api.timeout.ms (as recommended in the text of the timeout exceptions) to no avail.

Of my applications, the ones that are affected include
 * one stateless app with num.stream.threads=24.  With more than 1 instance (2-4x=48-96 threads), it will often never rebalance correctly, or only after multiple attempts (30+ minutes).  
 * one stateful app with 36 partitions of large-ish (500MB-1GB each) state stores which can take a while to restore.  This app successfully starts if I shut down all instances, delete state stores, set initial rebalance delay, and start all up simultaneously – but if any instance restarts or I attempt to scale up later, then rebalance will never succeed.  Additionally, when state stores are reassigned, there are "LockExceptions" (DEBUG level logs) in a tight loop, and the state stores fail to be closed cleanly, which forces the restore process to begin all over again.  The only way I can successfully do a rolling restart is if I use static membership and increase the session timeout.  If there is only a single instance of the app, then it works with no problems (but this is not a solution as I need multiple instances for scale).

Other side effects: the tight loop logs several DEBUG logs, which filled up log storage and caused pod evictions, which caused state stores to become invalid and restore (workaround: disable this logging).

Additionally, have seen the following exceptions sporadically, not sure if these are separate bugs:

{{2020-08-31T00:40:47.786Z ERROR Uncaught stream processing error! KafkaStreamsConfiguration java.lang.IllegalStateException: There are insufficient bytes available to read assignment from the sync-group response (actual byte size 0) , this is not expected; it is possible that the leader's assign function is buggy and did not return any assignment for this member, or *because static member is configured and the protocol is buggy* hence did not get the assignment for this member}}
{{    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:367)}}
{{    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)}}
{{    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)}}
{{    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)}}
{{    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)}}
{{    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)}}
{{    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)}}
{{    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)}}
{{    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:624)}}
{{    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)}}
{{    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)}}

{{2020-09-03T15:53:17.524Z ERROR Uncaught stream processing error! KafkaStreamsConfiguration java.lang.IllegalStateException: Active task 3_0 should have been suspended}}
{{    at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:281)}}
{{    ... 13 common frames omitted}}
{{Wrapped by: java.lang.RuntimeException: Unexpected failure to close 1 task(s) [[3_0]]. First unexpected exception (for task 3_0) follows.}}
{{    at org.apache.kafka.streams.processor.internals.TaskManager.handleAssignment(TaskManager.java:349)}}
{{    at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.onAssignment(StreamsPartitionAssignor.java:1428)}}
{{    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokeOnAssignment(ConsumerCoordinator.java:279)}}
{{    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:421)}}
{{    ... 10 common frames omitted}}
{{Wrapped by: org.apache.kafka.common.KafkaException: User rebalance callback throws an error}}
{{    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:436)}}
{{    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:440)}}
{{    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)}}
{{    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:513)}}
{{    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1268)}}
{{    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)}}
{{    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)}}
{{    at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:766)}}
{{    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:628)}}
{{    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)}}
{{    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)}}

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-10134
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10134
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.5.0
>            Reporter: Sean Guo
>            Assignee: Guozhang Wang
>            Priority: Blocker
>             Fix For: 2.5.2, 2.6.1
>
>         Attachments: consumer3.log.2020-08-20.log, consumer5.log.2020-07-22.log
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen when there is some load(some are long running tasks >30S) there, the CPU will go sky-high. It reads ~700% in our metrics so there should be several threads are in a tight loop. We have several consumer threads consuming from different partitions during the rebalance. This is reproducible in both the new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The difference is that with old eager rebalance rebalance protocol used the high CPU usage will dropped after the rebalance done. But when using cooperative one, it seems the consumers threads are stuck on something and couldn't finish the rebalance so the high CPU usage won't drop until we stopped our load. Also a small load without long running task also won't cause continuous high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x00007fe11f044000 nid=0x1f4 runnable  [0x00007fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x00007fe11f044000 nid=0x1f4 runnable  [0x00007fe119aab000]   java.lang.Thread.State: RUNNABLE at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at
>  
> By debugging into the code we found it looks like the clients are  in a loop on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)