You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Mike Mintz (JIRA)" <ji...@apache.org> on 2019/05/16 00:07:00 UTC

[jira] [Created] (KAFKA-8374) KafkaApis.handleLeaderAndIsrRequest not robust to ZooKeeper exceptions

Mike Mintz created KAFKA-8374:
---------------------------------

             Summary: KafkaApis.handleLeaderAndIsrRequest not robust to ZooKeeper exceptions
                 Key: KAFKA-8374
                 URL: https://issues.apache.org/jira/browse/KAFKA-8374
             Project: Kafka
          Issue Type: Bug
          Components: core, offset manager
    Affects Versions: 2.0.1
         Environment: Linux x86_64 (Ubuntu Xenial) running on AWS EC2
            Reporter: Mike Mintz


h2. Summary of bug (theory)

During a leader election, when a broker is transitioning from leader to follower on some __consumer_offset partitions and some __transaction_state partitions, it’s possible for a ZooKeeper exception to be thrown, leaving the GroupMetadataManager in an inconsistent state.

 
In particular, in KafkaApis.handleLeaderAndIsrRequest in the onLeadershipChange callback, it’s possible for TransactionCoordinator.handleTxnEmigration to throw ZooKeeperClientExpiredException, ending the updatedFollowers.foreach loop early. If there were any __consumer_offset partitions to be handled later in the loop, GroupMetadataManager will be left with stale data in its groupMetadataCache. Later, when this broker resumes leadership for the affected __consumer_offset partitions, it will fail to load the updated groups into the cache since it uses putIfNotExists, and it will serve stale offsets to consumers.
 
h2. Details of what we experienced
We ran into this issue running Kafka 2.0.1 in production. Several Kafka consumers received stale offsets when reconnecting to their group coordinator after a leadership election on their __consumer_offsets partition. This caused them to reprocess many duplicate messages.
 
We believe we’ve tracked down the root cause: * On 2019-04-01, we were having memory pressure in ZooKeeper, and we were getting several ZooKeeperClientExpiredException errors in the logs.
 * The impacted consumers were all in __consumer_offsets-15. There was a leader election on this partition, and leadership transitioned from broker 1088 to broker 1069. During this leadership election, the former leader (1088) saw a ZooKeeperClientExpiredException  (stack trace below). This happened inside KafkaApis.handleLeaderAndIsrRequest, specifically in onLeadershipChange while it was updating a __transaction_state partition. Since there are no “Scheduling unloading” or “Finished unloading” log messages in this period, we believe it threw this exception before getting to __consumer_offsets-15, so it never got a chance to call GroupCoordinator.handleGroupEmigration, which means this broker didn’t unload offsets from its GroupMetadataManager.
 * Four days later, on 2019-04-05, we manually restarted broker 1069, so broker 1088 became the leader of __consumer_offsets-15 again. When it ran GroupMetadataManager.loadGroup, it presumably failed to update groupMetadataCache since it uses putIfNotExists, and it would have found the group id already in the cache. Unfortunately we did not have debug logging enabled, but I would expect to have seen a log message like "Attempt to load group ${group.groupId} from log with generation ${group.generationId} failed because there is already a cached group with generation ${currentGroup.generationId}".
 * After the leadership election, the impacted consumers reconnected to broker 1088 and received stale offsets that correspond to the last committed offsets around 2019-04-01.

 
h2. Relevant log/stacktrace
{code:java}
[2019-04-01 22:44:18.968617] [2019-04-01 22:44:18,963] ERROR [KafkaApi-1088] Error when handling request {controller_id=1096,controller_epoch=122,partition_states=[...,{topic=__consumer_offsets,partition=15,controller_epoch=122,leader=1069,leader_epoch=440,isr=[1092,1088,1069],zk_version=807,replicas=[1069,1088,1092],is_new=false},...],live_leaders=[{id=1069,host=10.68.42.121,port=9094}]} (kafka.server.KafkaApis)
[2019-04-01 22:44:18.968689] kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either before or while waiting for connection
[2019-04-01 22:44:18.968712]         at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:238)
[2019-04-01 22:44:18.968736]         at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226)
[2019-04-01 22:44:18.968759]         at kafka.zookeeper.ZooKeeperClient$$anonfun$kafka$zookeeper$ZooKeeperClient$$waitUntilConnected$1.apply(ZooKeeperClient.scala:226)
[2019-04-01 22:44:18.968776]         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
[2019-04-01 22:44:18.968804]         at kafka.zookeeper.ZooKeeperClient.kafka$zookeeper$ZooKeeperClient$$waitUntilConnected(ZooKeeperClient.scala:226)
[2019-04-01 22:44:18.968836]         at kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply$mcV$sp(ZooKeeperClient.scala:220)
[2019-04-01 22:44:18.968863]         at kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220)
[2019-04-01 22:44:18.968891]         at kafka.zookeeper.ZooKeeperClient$$anonfun$waitUntilConnected$1.apply(ZooKeeperClient.scala:220)
[2019-04-01 22:44:18.968941]         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
[2019-04-01 22:44:18.968972]         at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:219)
[2019-04-01 22:44:18.968997]         at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1510)
[2019-04-01 22:44:18.969020]         at kafka.zk.KafkaZkClient.getReplicaAssignmentForTopics(KafkaZkClient.scala:463)
[2019-04-01 22:44:18.969062]         at kafka.zk.KafkaZkClient.getTopicPartitionCount(KafkaZkClient.scala:514)
[2019-04-01 22:44:18.969118]         at kafka.coordinator.transaction.TransactionStateManager.getTransactionTopicPartitionCount(TransactionStateManager.scala:280)
[2019-04-01 22:44:18.969168]         at kafka.coordinator.transaction.TransactionStateManager.validateTransactionTopicPartitionCountIsStable(TransactionStateManager.scala:466)
[2019-04-01 22:44:18.969206]         at kafka.coordinator.transaction.TransactionStateManager.removeTransactionsForTxnTopicPartition(TransactionStateManager.scala:435)
[2019-04-01 22:44:18.969239]         at kafka.coordinator.transaction.TransactionCoordinator.handleTxnEmigration(TransactionCoordinator.scala:282)
[2019-04-01 22:44:18.969266]         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:180)
[2019-04-01 22:44:18.969293]         at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$onLeadershipChange$1$2.apply(KafkaApis.scala:176)
[2019-04-01 22:44:18.969316]         at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
[2019-04-01 22:44:18.969341]         at kafka.server.KafkaApis.kafka$server$KafkaApis$$onLeadershipChange$1(KafkaApis.scala:176)
[2019-04-01 22:44:18.969361]         at kafka.server.KafkaApis$$anonfun$4.apply(KafkaApis.scala:185)
[2019-04-01 22:44:18.969383]         at kafka.server.KafkaApis$$anonfun$4.apply(KafkaApis.scala:185)
[2019-04-01 22:44:18.969412]         at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1117)
[2019-04-01 22:44:18.969435]         at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:185)
[2019-04-01 22:44:18.969454]         at kafka.server.KafkaApis.handle(KafkaApis.scala:110)
[2019-04-01 22:44:18.969476]         at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
[2019-04-01 22:44:18.969513]         at java.lang.Thread.run(Thread.java:748)
{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)