You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Kyle R Stehbens (Jira)" <ji...@apache.org> on 2022/06/17 16:43:00 UTC

[jira] [Comment Edited] (KAFKA-13840) KafkaConsumer is unable to recover connection to group coordinator after commitOffsetsAsync exception

    [ https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17555698#comment-17555698 ] 

Kyle R Stehbens edited comment on KAFKA-13840 at 6/17/22 4:42 PM:
------------------------------------------------------------------

Sure, looks like the same kind of of co-coordinator refresh loop where its got a stale findCoordinatorFuture that's not cleared after a re-connect.

MM logs show recurring batches of the following lines.

```
[2022-06-17 16:16:20,956] INFO [Consumer clientId=consumer-mm.dmz.to.internal-9, groupId=mm.dmz.to.internal] Node 2147483642 disconnected. (org.apache.kafka.clients.NetworkClient:935)
[2022-06-17 16:16:20,956] INFO [Consumer clientId=consumer-mm.dmz.to.internal-9, groupId=mm.dmz.to.internal] Group coordinator internal-kaf05.company.corp:9092 (id: 2147483642 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:916)
[2022-06-17 16:16:20,958] INFO [Consumer clientId=consumer-mm.dmz.to.internal-9, groupId=mm.dmz.to.internal] Discovered group coordinator internal-kaf05.company.corp:9092 (id: 2147483642 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:853)
[2022-06-17 16:16:20,958] INFO [Consumer clientId=consumer-mm.dmz.to.internal-9, groupId=mm.dmz.to.internal] Group coordinator internal-kaf05.company.corp:9092 (id: 2147483642 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:916)
[2022-06-17 16:16:20,958] INFO [Consumer clientId=consumer-mm.dmz.to.internal-9, groupId=mm.dmz.to.internal] Requesting disconnect from last known coordinator internal-kaf05.company.corp:9092 (id: 2147483642 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:929)
[2022-06-17 16:16:21,060] INFO [Consumer clientId=consumer-mm.dmz.to.internal-9, groupId=mm.dmz.to.internal] Discovered group coordinator internal-kaf05.company.corp:9092 (id: 2147483642 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:853)
```
 
The second log line here can only be triggered by this line:
[https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L892]
 
The coordinator is then re-discovered ok, but the 4th log line in the set then says its not disconnected and it needs to be re-discovered again.
This is because this check here:
[https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L244]
Returns a stale future that is not cleared properly from the lookupCoordinator(); call.
 
Something seems very broken with the logic surrounding co-coordinator reconnects and it looks like it mostly stems from not clearing the future after each attempt, and having stale results used in subsequent checks.

 

(I should note there was nothing really of interest in the log of the broker at this time)

Plenty of other clients are using the broker at this time so its nothing to do with broker side issues.


was (Author: kyle.stehbens):
Sure, looks like the same kind of of co-coordinator refresh loop where its got a stale findCoordinatorFuture that's not cleared after a re-connect.

MM logs show recurring batches of the following lines.

```
[2022-06-17 16:16:20,956] INFO [Consumer clientId=consumer-mm.dmz.to.internal-9, groupId=mm.dmz.to.internal] Node 2147483642 disconnected. (org.apache.kafka.clients.NetworkClient:935)
[2022-06-17 16:16:20,956] INFO [Consumer clientId=consumer-mm.dmz.to.internal-9, groupId=mm.dmz.to.internal] Group coordinator internal-kaf05.company.corp:9092 (id: 2147483642 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: true. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:916)
[2022-06-17 16:16:20,958] INFO [Consumer clientId=consumer-mm.dmz.to.internal-9, groupId=mm.dmz.to.internal] Discovered group coordinator internal-kaf05.company.corp:9092 (id: 2147483642 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:853)
[2022-06-17 16:16:20,958] INFO [Consumer clientId=consumer-mm.dmz.to.internal-9, groupId=mm.dmz.to.internal] Group coordinator internal-kaf05.company.corp:9092 (id: 2147483642 rack: null) is unavailable or invalid due to cause: coordinator unavailable.isDisconnected: false. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:916)
[2022-06-17 16:16:20,958] INFO [Consumer clientId=consumer-mm.dmz.to.internal-9, groupId=mm.dmz.to.internal] Requesting disconnect from last known coordinator internal-kaf05.company.corp:9092 (id: 2147483642 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:929)
[2022-06-17 16:16:21,060] INFO [Consumer clientId=consumer-mm.dmz.to.internal-9, groupId=mm.dmz.to.internal] Discovered group coordinator internal-kaf05.company.corp:9092 (id: 2147483642 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:853)
```
 
The second log line here can only be triggered by this line:
[https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L892]
 
The coordinator is then re-discovered ok, but the 4th log line in the set then says its not disconnected and it needs to be re-discovered again.
This is because this check here:
[https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L244]
Returns a stale future that is not cleared properly from the lookupCoordinator(); call.
 
Something seems very broken with the logic surrounding co-coordinator reconnects and it looks like it mostly stems from not clearing the future after each attempt, and having stale results used in subsequent checks.

> KafkaConsumer is unable to recover connection to group coordinator after commitOffsetsAsync exception
> -----------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-13840
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13840
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>    Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0
>            Reporter: Kyle R Stehbens
>            Assignee: Luke Chen
>            Priority: Major
>
> Hi, I've discovered an issue with the java Kafka client (consumer) whereby a timeout or any other retry-able exception triggered during an async offset commit, renders the client unable to recover its group co-coordinator and leaves the client in a broken state.
>  
> I first encountered this using v2.8.1 of the java client, and after going through the code base for all versions of the client, have found it affects all versions of the client from 2.6.1 onward.
> I also confirmed that by rolling back to 2.5.1, the issue is not present.
>  
> The issue stems from changes to how the FindCoordinatorResponseHandler in 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure here:
> [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783]
>  
> In all future version of the client this call is not made:
> [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838]
>  
> What this results in, is when the KafkaConsumer makes a call to coordinator.commitOffsetsAsync(...), if an error occurs such that the coordinator is unavailable here:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007]
>  
> then the client will try call:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017]
> However this will never be able to succeed as it perpetually returns a reference to a failed future: findCoordinatorFuture that is never cleared out.
>  
> This manifests in all future calls to commitOffsetsAsync() throwing a "coordinator unavailable" exception forever going forward after any retry-able exception causes the coordinator to close. 
> Note we discovered this when we upgraded the kafka client in our Flink consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the client. We noticed this occurring in our non-flink java consumers too running 3.x client versions.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)