You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "RivenSun (Jira)" <ji...@apache.org> on 2023/02/17 05:22:00 UTC

[jira] [Updated] (KAFKA-14729) The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread

     [ https://issues.apache.org/jira/browse/KAFKA-14729?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

RivenSun updated KAFKA-14729:
-----------------------------
    Description: 
h2. case situation:

1. The business program occupies a large amount of memory, causing the `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally.
{code:java}
2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator][kafka-coor][Consumer clientId=consumer-5, groupId=asyncmq_local_us_dev-webcachesync_fcd02f8e-4f7e-4829-93ed-e8fa9cdc81f2_dev_VA] Heartbeat thread failed due to unexpected error java.lang.OutOfMemoryError: Java heap space {code}
2. The finally module of the heartbeat thread ` run()` method only prints the log, but does not update the value of `AbstractCoordinator.state`.
3. For kafkaConsumer with the groupRebalance mechanism enabled, in the `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take the value `timeToNextHeartbeat(now)`.
4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will never be updated again.
And the `AbstractCoordinator.state` field value will always be {*}STABLE{*},
So the `timeToNextHeartbeat(long now)` method will return {color:#ff0000}0{color}.
0 will be passed to the underlying `networkClient#poll` method.

 

In the end, the user calls the `poll(duration)` method in an endless loop, and the `kafkaConsumer#pollForFetches(timer)` method will always return very quickly, taking up a lot of cpu.

 
h2. solution:

1. Refer to the note of `MemberState.STABLE` :
{code:java}
the client has joined and is sending heartbeats.{code}
When the heartbeat thread exits, in `finally` module, we should add code:
{code:java}
state = MemberState.UNJOINED;
closed = true;{code}
2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new judgment condition: `heartbeatThread.hasFailed()`
{code:java}
if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed())
    return Long.MAX_VALUE;
return heartbeat.timeToNextHeartbeat(now);{code}
 

  was:
h2. case situation:

1. The business program occupies a large amount of memory, causing the `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally.
{code:java}
2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator:1105][kafka-coor][Consumer clientId=consumer-5, groupId=asyncmq_local_us_dev-webcachesync_fcd02f8e-4f7e-4829-93ed-e8fa9cdc81f2_dev_VA] Heartbeat thread failed due to unexpected error java.lang.OutOfMemoryError: Java heap space {code}

2. The finally module of the heartbeat thread ` run()` method only prints the log, but does not update the value of `AbstractCoordinator.state`.
3. For kafkaConsumer with the groupRebalance mechanism enabled, in the `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take the value `timeToNextHeartbeat(now)`.
4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will never be updated again.
And the `AbstractCoordinator.state` field value will always be {*}STABLE{*},
So the `timeToNextHeartbeat(long now)` method will return {color:#FF0000}0{color}.
0 will be passed to the underlying `networkClient#poll` method.

 

In the end, the user calls the `poll(duration)` method in an endless loop, and the `kafkaConsumer#pollForFetches(timer)` method will always return very quickly, taking up a lot of cpu.

 
h2. solution:


1. Refer to the note of `MemberState.STABLE` : 
{code:java}
the client has joined and is sending heartbeats.{code}

When the heartbeat thread exits, in `finally` module, we should add code:
{code:java}
state = MemberState.UNJOINED;
closed = true;{code}

2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new judgment condition: `heartbeatThread.hasFailed()`
{code:java}
if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed())
    return Long.MAX_VALUE;
return heartbeat.timeToNextHeartbeat(now);{code}
 


> The kafakConsumer pollForFetches(timer) method takes up a lot of cpu due to the abnormal exit of the heartbeat thread
> ---------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-14729
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14729
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 3.3.0, 3.3.2
>            Reporter: RivenSun
>            Assignee: RivenSun
>            Priority: Major
>         Attachments: image-2023-02-17-13-15-50-362.png, jstack_highCpu.txt
>
>
> h2. case situation:
> 1. The business program occupies a large amount of memory, causing the `run()` method of HeartbeatThread of kafkaConsumer to exit abnormally.
> {code:java}
> 2023-02-14 06:55:57.771[][ERROR][AbstractCoordinator][kafka-coor][Consumer clientId=consumer-5, groupId=asyncmq_local_us_dev-webcachesync_fcd02f8e-4f7e-4829-93ed-e8fa9cdc81f2_dev_VA] Heartbeat thread failed due to unexpected error java.lang.OutOfMemoryError: Java heap space {code}
> 2. The finally module of the heartbeat thread ` run()` method only prints the log, but does not update the value of `AbstractCoordinator.state`.
> 3. For kafkaConsumer with the groupRebalance mechanism enabled, in the `kafkaConsumer#pollForFetches(timer)` method, pollTimeout may eventually take the value `timeToNextHeartbeat(now)`.
> 4. Since the heartbeat thread has exited, `heartbeatTimer.deadlineMs` will never be updated again.
> And the `AbstractCoordinator.state` field value will always be {*}STABLE{*},
> So the `timeToNextHeartbeat(long now)` method will return {color:#ff0000}0{color}.
> 0 will be passed to the underlying `networkClient#poll` method.
>  
> In the end, the user calls the `poll(duration)` method in an endless loop, and the `kafkaConsumer#pollForFetches(timer)` method will always return very quickly, taking up a lot of cpu.
>  
> h2. solution:
> 1. Refer to the note of `MemberState.STABLE` :
> {code:java}
> the client has joined and is sending heartbeats.{code}
> When the heartbeat thread exits, in `finally` module, we should add code:
> {code:java}
> state = MemberState.UNJOINED;
> closed = true;{code}
> 2. In the `AbstractCoordinator#timeToNextHeartbeat(now)` method, add a new judgment condition: `heartbeatThread.hasFailed()`
> {code:java}
> if (state.hasNotJoinedGroup() || heartbeatThread.hasFailed())
>     return Long.MAX_VALUE;
> return heartbeat.timeToNextHeartbeat(now);{code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)