You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/07/18 06:19:12 UTC
[kafka] branch trunk updated: KAFKA-13572 Fix negative preferred replica imbalanced count metric (#12405)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ab9aaea3ce KAFKA-13572 Fix negative preferred replica imbalanced count metric (#12405)
ab9aaea3ce is described below
commit ab9aaea3ce498db80ca55d0340f47abf1d975b07
Author: Okada Haruki <oc...@gmail.com>
AuthorDate: Mon Jul 18 15:19:01 2022 +0900
KAFKA-13572 Fix negative preferred replica imbalanced count metric (#12405)
Currently, preferredReplicaImbalanceCount calculation has a race that becomes negative when topic deletion is initiated simultaneously. This PR addresses the problem by fixing cleanPreferredReplicaImbalanceMetric to be called only once per topic-deletion procedure
Reviewers: Luke Chen <sh...@gmail.com>
---
.../scala/kafka/controller/ControllerContext.scala | 13 ++++++++++---
.../kafka/controller/ControllerContextTest.scala | 21 +++++++++++++++++++++
2 files changed, 31 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 379196aa1d..7065d87c4c 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -327,9 +327,16 @@ class ControllerContext {
}
}
- def queueTopicDeletion(topics: Set[String]): Unit = {
- topicsToBeDeleted ++= topics
- topics.foreach(cleanPreferredReplicaImbalanceMetric)
+ def queueTopicDeletion(topicToBeAddedIntoDeletionList: Set[String]): Unit = {
+ // queueTopicDeletion could be called multiple times for same topic.
+ // e.g. 1) delete topic-A => 2) delete topic-B before A's deletion completes.
+ // In this case, at 2), queueTopicDeletion will be called with Set(topic-A, topic-B).
+ // However we should call cleanPreferredReplicaImbalanceMetric only once for same topic
+ // because otherwise, preferredReplicaImbalanceCount could be decremented wrongly at 2nd call.
+ // So we need to take a diff with already queued topics here.
+ val newlyDeletedTopics = topicToBeAddedIntoDeletionList.diff(topicsToBeDeleted)
+ topicsToBeDeleted ++= newlyDeletedTopics
+ newlyDeletedTopics.foreach(cleanPreferredReplicaImbalanceMetric)
}
def beginTopicDeletion(topics: Set[String]): Unit = {
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
index e8efa5af79..e88bb321ad 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
@@ -203,4 +203,25 @@ class ControllerContextTest {
context.removeTopic(tp3.topic)
assertEquals(0, context.preferredReplicaImbalanceCount)
}
+
+ @Test
+ def testPreferredReplicaImbalanceMetricOnConcurrentTopicDeletion(): Unit = {
+ val topicA = "A"
+ val topicB = "B"
+ val tpA = new TopicPartition(topicA, 0)
+ val tpB = new TopicPartition(topicB, 0)
+ context.updatePartitionFullReplicaAssignment(tpA, ReplicaAssignment(Seq(1, 2, 3)))
+ context.updatePartitionFullReplicaAssignment(tpB, ReplicaAssignment(Seq(1, 2, 3)))
+ assertEquals(0, context.preferredReplicaImbalanceCount)
+
+ context.queueTopicDeletion(Set(topicA))
+ // All partitions in topic will be marked as Offline during deletion procedure
+ context.putPartitionLeadershipInfo(tpA, LeaderIsrAndControllerEpoch(LeaderAndIsr(LeaderAndIsr.NoLeader, List(1, 2, 3)), 0))
+ assertEquals(0, context.preferredReplicaImbalanceCount)
+
+ // Initiate topicB's topic deletion before topicA's deletion completes.
+ // Since topicA's delete-topic ZK node still exists, context.queueTopicDeletion will be called with Set(topicA, topicB)
+ context.queueTopicDeletion(Set(topicA, topicB))
+ assertEquals(0, context.preferredReplicaImbalanceCount)
+ }
}