You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Jerry Wei (Jira)" <ji...@apache.org> on 2020/07/12 06:30:00 UTC

[jira] [Comment Edited] (KAFKA-10134) High CPU issue during rebalance in Kafka consumer after upgrading to 2.5

    [ https://issues.apache.org/jira/browse/KAFKA-10134?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17156214#comment-17156214 ] 

Jerry Wei edited comment on KAFKA-10134 at 7/12/20, 6:29 AM:
-------------------------------------------------------------

[~guozhang], I have three brokers and 10 consumers. When I restart one of consumers, some of other consumers will be with high CPU issue. 


{code:java}
// from KafkaConsumer.java (a fine fix)
// private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout);
                if (includeMetadataInTimeout) {
                    // try to update assignment metadata BUT do not need to block on the timer if we still have
                    // some assigned partitions, since even if we are 1) in the middle of a rebalance
                    // or 2) have partitions with unknown starting positions we may still want to return some data
                    // as long as there are some partitions fetchable; NOTE we always use a timer with 0ms
                    // to never block on completing the rebalance procedure if there's any
                    if (subscriptions.fetchablePartitions(tp -> true).isEmpty()) {
                        updateAssignmentMetadataIfNeeded(timer);
                    } else {
                        final Timer updateMetadataTimer = time.timer(0L);
                        updateAssignmentMetadataIfNeeded(updateMetadataTimer);
                        timer.update(updateMetadataTimer.currentTimeMs());
                    }
                } else {
                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
                        log.warn("Still waiting for metadata");
                    }
                }
{code}


{code:java}
// from KafkaConsumer.java (last commit)
// private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout);

               if (includeMetadataInTimeout) {
                    // try to update assignment metadata BUT do not need to block on the timer for join group
                    updateAssignmentMetadataIfNeeded(timer, false);
                } else {
                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
                        log.warn("Still waiting for metadata");
                    }
                }
{code}

Per the above two commits I have one question about `updateAssignmentMetadataIfNeeded(timer, false);` why the second parameter is false? Per my understanding when it's false, actually it's same as before `updateAssignmentMetadataIfNeeded(time.timer(0L));` 
I've tested w/ true, it looks fine for us and I'm thinking when it's w/ true, the behavior is similar to [the commit|https://github.com/apache/kafka/pull/8934/commits/333a967ec22ea22babf32b18349b76b6552a2fac].


was (Author: zhowei):
[~guozhang], I have three brokers and 10 consumers. When I restart one of consumers, some of other consumers will be with high CPU issue. 


{code:java}
// from KafkaConsumer.java (a fine fix)
// private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout);
                if (includeMetadataInTimeout) {
                    // try to update assignment metadata BUT do not need to block on the timer if we still have
                    // some assigned partitions, since even if we are 1) in the middle of a rebalance
                    // or 2) have partitions with unknown starting positions we may still want to return some data
                    // as long as there are some partitions fetchable; NOTE we always use a timer with 0ms
                    // to never block on completing the rebalance procedure if there's any
                    if (subscriptions.fetchablePartitions(tp -> true).isEmpty()) {
                        updateAssignmentMetadataIfNeeded(timer);
                    } else {
                        final Timer updateMetadataTimer = time.timer(0L);
                        updateAssignmentMetadataIfNeeded(updateMetadataTimer);
                        timer.update(updateMetadataTimer.currentTimeMs());
                    }
                } else {
                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE))) {
                        log.warn("Still waiting for metadata");
                    }
                }
{code}


{code:java}
// from KafkaConsumer.java (last commit)
// private ConsumerRecords<K, V> poll(final Timer timer, final boolean includeMetadataInTimeout);

               if (includeMetadataInTimeout) {
                    // try to update assignment metadata BUT do not need to block on the timer for join group
                    updateAssignmentMetadataIfNeeded(timer, false);
                } else {
                    while (!updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE), true)) {
                        log.warn("Still waiting for metadata");
                    }
                }
{code}

Per the above two commits I have one question about `updateAssignmentMetadataIfNeeded(timer, false);` why the second parameter is false? Per my understanding when it's false, actually it's same as before `updateAssignmentMetadataIfNeeded(time.timer(0L));` 
I've tested w/ true, it looks fine for us and I'm thinking when it's w/ true, the behavior is similar to [ the commit|https://github.com/apache/kafka/pull/8934/commits/333a967ec22ea22babf32b18349b76b6552a2fac].

> High CPU issue during rebalance in Kafka consumer after upgrading to 2.5
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-10134
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10134
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 2.5.0
>            Reporter: Sean Guo
>            Assignee: Guozhang Wang
>            Priority: Blocker
>             Fix For: 2.6.0, 2.5.1
>
>
> We want to utilize the new rebalance protocol to mitigate the stop-the-world effect during the rebalance as our tasks are long running task.
> But after the upgrade when we try to kill an instance to let rebalance happen when there is some load(some are long running tasks >30S) there, the CPU will go sky-high. It reads ~700% in our metrics so there should be several threads are in a tight loop. We have several consumer threads consuming from different partitions during the rebalance. This is reproducible in both the new CooperativeStickyAssignor and old eager rebalance rebalance protocol. The difference is that with old eager rebalance rebalance protocol used the high CPU usage will dropped after the rebalance done. But when using cooperative one, it seems the consumers threads are stuck on something and couldn't finish the rebalance so the high CPU usage won't drop until we stopped our load. Also a small load without long running task also won't cause continuous high CPU usage as the rebalance can finish in that case.
>  
> "executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x00007fe11f044000 nid=0x1f4 runnable  [0x00007fe119aab000]"executor.kafka-consumer-executor-4" #124 daemon prio=5 os_prio=0 cpu=76853.07ms elapsed=841.16s tid=0x00007fe11f044000 nid=0x1f4 runnable  [0x00007fe119aab000]   java.lang.Thread.State: RUNNABLE at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:467) at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1241) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) at
>  
> By debugging into the code we found it looks like the clients are  in a loop on finding the coordinator.
> I also tried the old rebalance protocol for the new version the issue still exists but the CPU will be back to normal when the rebalance is done.
> Also tried the same on the 2.4.1 which seems don't have this issue. So it seems related something changed between 2.4.1 and 2.5.0.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)