You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Ivan Yurchenko (Jira)" <ji...@apache.org> on 2023/01/02 14:29:00 UTC

[jira] [Updated] (KAFKA-8206) A consumer can't discover new group coordinator when the cluster was partly restarted

     [ https://issues.apache.org/jira/browse/KAFKA-8206?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ivan Yurchenko updated KAFKA-8206:
----------------------------------
    Component/s: clients

> A consumer can't discover new group coordinator when the cluster was partly restarted
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-8206
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8206
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 3.1.0, 3.2.0, 3.1.1, 3.3.0, 3.1.2, 3.2.1, 3.2.2, 3.2.3, 3.3.1
>            Reporter: alex gabriel
>            Assignee: Ivan Yurchenko
>            Priority: Critical
>              Labels: needs-kip
>
> *A consumer can't discover new group coordinator when the cluster was partly restarted*
> Preconditions:
> I use Kafka server and Java kafka-client lib 2.2 version
> I have 2 Kafka nodes running localy (localhost:9092, localhost:9093) and 1 ZK(localhost:2181)
> I have replication factor 2 for the all my topics and '_unclean.leader.election.enable=true_' on both Kafka nodes.
> Steps to reproduce:
> 1) Start 2nodes (localhost:9092/localhost:9093)
> 2) Start consumer with 'bootstrap.servers=localhost:9092,localhost:9093'
> {noformat}
> // discovered group coordinator (0-node)
> 2019-04-09 16:23:18,963 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>
> ...metadatacache is updated (2 nodes in the cluster list)
> 2019-04-09 16:23:18,928 DEBUG [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Sending metadata request (type=MetadataRequest, topics=<ALL>) to node localhost:9092 (id: -1 rack: null)>
> 2019-04-09 16:23:18,940 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 2 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null), localhost:9093 (id: 1 rack: null)], partitions = [], controller = localhost:9092 (id: 0 rack: null))}>
> {noformat}
> 3) Shutdown 1-node (localhost:9093)
> {noformat}
> // metadata was updated to the 4 version (but for some reasons it still had 2 alive nodes inside cluster)
> 2019-04-09 16:23:46,981 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 4 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9093 (id: 1 rack: null), localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0,1], offlineReplicas = []), Partition(topic = events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0,1], offlineReplicas = [])], controller = localhost:9092 (id: 0 rack: null))}>
> //consumers thinks that node-1 is still alive and try to send coordinator lookup to it but failed
> 2019-04-09 16:23:46,981 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9093 (id: 2147483646 rack: null)>
> 2019-04-09 16:23:46,981 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group coordinator localhost:9093 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery>
> 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Node 1 disconnected.>
> 2019-04-09 16:24:01,117 WARN [org.apache.kafka.clients.NetworkClient.processDisconnection] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Connection to node 1 (localhost:9093) could not be established. Broker may not be available.>
> // refreshing metadata again
> 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled request with header RequestHeader(apiKey=FIND_COORDINATOR, apiVersion=2, clientId=events-consumer0, correlationId=112) due to node 1 being disconnected>
> 2019-04-09 16:24:01,117 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Coordinator discovery failed, refreshing metadata>
> // metadata was updated to the 5 version where cluster had only 0-node localhost:9092 as expected.
> 2019-04-09 16:24:01,131 DEBUG [org.apache.kafka.clients.Metadata.update] - Updated cluster metadata version 5 to MetadataCache{cluster=Cluster(id = P3pz1xU0SjK-Dhy6h2G5YA, nodes = [localhost:9092 (id: 0 rack: null)], partitions = [Partition(topic = events-sorted, partition = 1, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = [1]), Partition(topic = events-sorted, partition = 0, leader = 0, replicas = [0,1], isr = [0], offlineReplicas = [1])], controller = localhost:9092 (id: 0 rack: null))}>
> // 0-node discovered as coordinator
> 2019-04-09 16:24:01,132 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator$FindCoordinatorResponseHandler.onSuccess] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)>
> {noformat}
> *At this point consumer stores only information about 0-node(localhost:9092) inside cluster property of the metadata cache.*
> 4) Shutdown 0-node (localhost:9092)
> 5) Start 1-node (localhost:9093)
> {noformat}
> //consumer tries to re-connect only to the 0-node
> 2019-04-09 16:24:40,649 DEBUG [org.apache.kafka.common.network.Selector.pollSelectionKeys] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Connection with localhost disconnected>
> 2019-04-09 16:24:40,649 DEBUG [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Node 0 disconnected.>
> 2019-04-09 16:24:40,649 DEBUG [org.apache.kafka.clients.NetworkClient.handleDisconnections] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Node 2147483647 disconnected.>
> 2019-04-09 16:24:40,649 DEBUG [org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=10, clientId=events-consumer0, correlationId=209) due to node 0 being disconnected>
> 2019-04-09 16:24:40,649 INFO [org.apache.kafka.clients.FetchSessionHandler.handleError] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Error sending fetch request (sessionId=1754516055, epoch=132) to node 0: org.apache.kafka.common.errors.DisconnectException.>
> 2019-04-09 16:24:40,650 INFO [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.markCoordinatorUnknown] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Group coordinator localhost:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery>
> 2019-04-09 16:24:40,650 DEBUG [org.apache.kafka.clients.consumer.internals.AbstractCoordinator.lookupCoordinator] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] No broker available to send FindCoordinator request>
> 2019-04-09 16:24:40,650 DEBUG [org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate] - [Consumer clientId=events-consumer0, groupId=events-group-gabriel] Give up sending metadata request since no node is available>
> {noformat}
> As I see the consumer tries to initialize connection only to the 0-node cause this is only node inside consumer's cluster metadata cache, thus consumer can't get a connection to the new group coordinator 1-node and as result can't poll any messages
> _To resolve this pending state 0-node must be restored OR consumer must be re-started(in this case consumer discover a lastly started node(1-node in my case) from the initial brokers list)_
> p.s. The same behavior could be reproduced using a 3 nodes cluster.
> _The question: is this by design that consumer stores metadata cache with only active nodes list?_
> Since it leads to a situation when a last active node from the list goes down and the other node come to life but the consumer doesn't have any info about that node and trying to re-connect to the last active node ignoring new node.
> From my point of view, a consumer should always store some initial nodes list and try to reconnect to the nodes from the initial list in case if there are no alive nodes from the metadata cluster cache.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)