You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/08/27 15:21:37 UTC
[kafka] branch trunk updated: KAFKA-13233 Log zkVersion in more
places (#11266)
This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new edd5a24 KAFKA-13233 Log zkVersion in more places (#11266)
edd5a24 is described below
commit edd5a249e905670b9d7f806250d83bff777afa28
Author: David Arthur <mu...@gmail.com>
AuthorDate: Fri Aug 27 11:19:58 2021 -0400
KAFKA-13233 Log zkVersion in more places (#11266)
When debugging issues with partition state, it's very useful to know the zkVersion that was written. This patch adds the zkVersion of LeaderAndIsr in a few more places.
---
.../src/main/scala/kafka/controller/KafkaController.scala | 15 ++++++++-------
.../scala/kafka/controller/PartitionStateMachine.scala | 7 +++++++
.../main/scala/kafka/controller/ReplicaStateMachine.scala | 7 +++++++
3 files changed, 22 insertions(+), 7 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 81b193e..1d36f52 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1220,7 +1220,7 @@ class KafkaController(val config: KafkaConfig,
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, epoch)
controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch)
finalLeaderIsrAndControllerEpoch = Some(leaderIsrAndControllerEpoch)
- info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}")
+ info(s"Updated leader epoch for partition $partition to ${leaderAndIsr.leaderEpoch}, zkVersion=${leaderAndIsr.zkVersion}")
true
case Some(Left(e)) => throw e
case None => false
@@ -2344,17 +2344,17 @@ class KafkaController(val config: KafkaConfig,
debug(s"ISR for partition $partition updated to [${updatedIsr.isr.mkString(",")}] and zkVersion updated to [${updatedIsr.zkVersion}]")
partitionResponses(partition) = Right(updatedIsr)
Some(partition -> updatedIsr)
- case Left(error) =>
- warn(s"Failed to update ISR for partition $partition", error)
- partitionResponses(partition) = Left(Errors.forException(error))
+ case Left(e) =>
+ error(s"Failed to update ISR for partition $partition", e)
+ partitionResponses(partition) = Left(Errors.forException(e))
None
}
}
- badVersionUpdates.foreach(partition => {
- debug(s"Failed to update ISR for partition $partition, bad ZK version")
+ badVersionUpdates.foreach { partition =>
+ info(s"Failed to update ISR to ${adjustedIsrs(partition)} for partition $partition, bad ZK version.")
partitionResponses(partition) = Left(Errors.INVALID_UPDATE_VERSION)
- })
+ }
def processUpdateNotifications(partitions: Seq[TopicPartition]): Unit = {
val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeq
@@ -2633,6 +2633,7 @@ case class LeaderIsrAndControllerEpoch(leaderAndIsr: LeaderAndIsr, controllerEpo
leaderAndIsrInfo.append("(Leader:" + leaderAndIsr.leader)
leaderAndIsrInfo.append(",ISR:" + leaderAndIsr.isr.mkString(","))
leaderAndIsrInfo.append(",LeaderEpoch:" + leaderAndIsr.leaderEpoch)
+ leaderAndIsrInfo.append(",ZkVersion:" + leaderAndIsr.zkVersion)
leaderAndIsrInfo.append(",ControllerEpoch:" + controllerEpoch + ")")
leaderAndIsrInfo.toString()
}
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 17d6fde..e812a14 100755
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -437,6 +437,13 @@ class ZkPartitionStateMachine(config: KafkaConfig,
}
}
+ if (isDebugEnabled) {
+ updatesToRetry.foreach { partition =>
+ debug(s"Controller failed to elect leader for partition $partition. " +
+ s"Attempted to write state ${adjustedLeaderAndIsrs(partition)}, but failed with bad ZK version. This will be retried.")
+ }
+ }
+
(finishedUpdates ++ failedElections, updatesToRetry)
}
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index f56d28d..5b41e66 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -370,6 +370,13 @@ class ZkReplicaStateMachine(config: KafkaConfig,
})
}
+ if (isDebugEnabled) {
+ updatesToRetry.foreach { partition =>
+ debug(s"Controller failed to remove replica $replicaId from ISR of partition $partition. " +
+ s"Attempted to write state ${adjustedLeaderAndIsrs(partition)}, but failed with bad ZK version. This will be retried.")
+ }
+ }
+
(leaderIsrAndControllerEpochs ++ exceptionsForPartitionsWithNoLeaderAndIsrInZk, updatesToRetry)
}