You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/08/27 18:15:25 UTC

[kafka] branch 2.7 updated: KAFKA-13233 Log zkVersion in more places (#11266)

This is an automated email from the ASF dual-hosted git repository.

davidarthur pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 39a52ee  KAFKA-13233 Log zkVersion in more places (#11266)
39a52ee is described below

commit 39a52ee91d7a51ed05610a6134dee71a2d03579f
Author: David Arthur <mu...@gmail.com>
AuthorDate: Fri Aug 27 11:19:58 2021 -0400

    KAFKA-13233 Log zkVersion in more places (#11266)
    
    When debugging issues with partition state, it's very useful to know the zkVersion that was written. This patch adds the zkVersion of LeaderAndIsr in a few more places.
---
 .../src/main/scala/kafka/controller/KafkaController.scala | 15 ++++++++-------
 .../scala/kafka/controller/PartitionStateMachine.scala    |  7 +++++++
 .../main/scala/kafka/controller/ReplicaStateMachine.scala |  7 +++++++
 3 files changed, 22 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index c5db8ca..91a33bd 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1213,7 +1213,7 @@ class KafkaController(val config: KafkaConfig,
               val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)
               controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
               finalLeaderIsrAndControllerEpoch = Some(leaderIsrAndControllerEpoch)
-              info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}")
+              info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}, zkVersion=${leaderAndIsr.zkVersion}")
               true
             case Some(Left(e)) => throw e
             case None => false
@@ -2308,17 +2308,17 @@ class KafkaController(val config: KafkaConfig,
               debug(s"ISR for partition $partition updated to [${updatedIsr.isr.mkString(",")}] and zkVersion updated to [${updatedIsr.zkVersion}]")
               partitionResponses(partition) = Right(updatedIsr)
               Some(partition -> updatedIsr)
-            case Left(error) =>
-              warn(s"Failed to update ISR for partition $partition", error)
-              partitionResponses(partition) = Left(Errors.forException(error))
+            case Left(e) =>
+              error(s"Failed to update ISR for partition $partition", e)
+              partitionResponses(partition) = Left(Errors.forException(e))
               None
           }
       }
 
-      badVersionUpdates.foreach(partition => {
-        debug(s"Failed to update ISR for partition $partition, bad ZK version")
+      badVersionUpdates.foreach { partition =>
+        info(s"Failed to update ISR to ${adjustedIsrs(partition)} for partition $partition, bad ZK version.")
         partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
-      })
+      }
 
       def processUpdateNotifications(partitions: Seq[TopicPartition]): Unit = {
         val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
@@ -2547,6 +2547,7 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo
     leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)
     leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(","))
     leaderAndIsrInfo.append(",LeaderEpoch:" + leaderAndIsr.leaderEpoch)
+    leaderAndIsrInfo.append(",ZkVersion:" + leaderAndIsr.zkVersion)
     leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")")
     leaderAndIsrInfo.toString()
   }
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 812eaaa..c363b70 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -444,6 +444,13 @@ class ZkPartitionStateMachine(config: KafkaConfig,
       }
     }
 
+    if (isDebugEnabled) {
+      updatesToRetry.foreach { partition =>
+        debug(s"Controller failed to elect leader for partition $partition. " +
+          s"Attempted to write state ${adjustedLeaderAndIsrs(partition)}, but failed with bad ZK version. This will be retried.")
+      }
+    }
+
     (finishedUpdates ++ failedElections, updatesToRetry)
   }
 
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 15f44bc..c08d957 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -370,6 +370,13 @@ class ZkReplicaStateMachine(config: KafkaConfig,
         })
       }
 
+    if (isDebugEnabled) {
+      updatesToRetry.foreach { partition =>
+        debug(s"Controller failed to remove replica $replicaId from ISR of partition $partition. " +
+          s"Attempted to write state ${adjustedLeaderAndIsrs(partition)}, but failed with bad ZK version. This will be retried.")
+      }
+    }
+
     (leaderIsrAndControllerEpochs ++ exceptionsForPartitionsWithNoLeaderAndIsrInZk, updatesToRetry)
   }