You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/07/18 06:20:30 UTC

[kafka] branch 3.3 updated: KAFKA-13572 Fix negative preferred replica imbalanced count metric (#12405)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new a0a5290ea6 KAFKA-13572 Fix negative preferred replica imbalanced count metric (#12405)
a0a5290ea6 is described below

commit a0a5290ea648e3e123b567a674d7b280616aa234
Author: Okada Haruki <oc...@gmail.com>
AuthorDate: Mon Jul 18 15:19:01 2022 +0900

    KAFKA-13572 Fix negative preferred replica imbalanced count metric (#12405)
    
    
    Currently, preferredReplicaImbalanceCount calculation has a race that becomes negative when topic deletion is initiated simultaneously. This PR addresses the problem by fixing cleanPreferredReplicaImbalanceMetric to be called only once per topic-deletion procedure
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../scala/kafka/controller/ControllerContext.scala  | 13 ++++++++++---
 .../kafka/controller/ControllerContextTest.scala    | 21 +++++++++++++++++++++
 2 files changed, 31 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 379196aa1d..7065d87c4c 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -327,9 +327,16 @@ class ControllerContext {
     }
   }
 
-  def queueTopicDeletion(topics: Set[String]): Unit = {
-    topicsToBeDeleted ++= topics
-    topics.foreach(cleanPreferredReplicaImbalanceMetric)
+  def queueTopicDeletion(topicToBeAddedIntoDeletionList: Set[String]): Unit = {
+    // queueTopicDeletion could be called multiple times for same topic.
+    // e.g. 1) delete topic-A => 2) delete topic-B before A's deletion completes.
+    // In this case, at 2), queueTopicDeletion will be called with Set(topic-A, topic-B).
+    // However we should call cleanPreferredReplicaImbalanceMetric only once for same topic
+    // because otherwise, preferredReplicaImbalanceCount could be decremented wrongly at 2nd call.
+    // So we need to take a diff with already queued topics here.
+    val newlyDeletedTopics = topicToBeAddedIntoDeletionList.diff(topicsToBeDeleted)
+    topicsToBeDeleted ++= newlyDeletedTopics
+    newlyDeletedTopics.foreach(cleanPreferredReplicaImbalanceMetric)
   }
 
   def beginTopicDeletion(topics: Set[String]): Unit = {
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
index e8efa5af79..e88bb321ad 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerContextTest.scala
@@ -203,4 +203,25 @@ class ControllerContextTest {
     context.removeTopic(tp3.topic)
     assertEquals(0, context.preferredReplicaImbalanceCount)
   }
+
+  @Test
+  def testPreferredReplicaImbalanceMetricOnConcurrentTopicDeletion(): Unit = {
+    val topicA = "A"
+    val topicB = "B"
+    val tpA = new TopicPartition(topicA, 0)
+    val tpB = new TopicPartition(topicB, 0)
+    context.updatePartitionFullReplicaAssignment(tpA, ReplicaAssignment(Seq(1, 2, 3)))
+    context.updatePartitionFullReplicaAssignment(tpB, ReplicaAssignment(Seq(1, 2, 3)))
+    assertEquals(0, context.preferredReplicaImbalanceCount)
+
+    context.queueTopicDeletion(Set(topicA))
+    // All partitions in topic will be marked as Offline during deletion procedure
+    context.putPartitionLeadershipInfo(tpA, LeaderIsrAndControllerEpoch(LeaderAndIsr(LeaderAndIsr.NoLeader, List(1, 2, 3)), 0))
+    assertEquals(0, context.preferredReplicaImbalanceCount)
+
+    // Initiate topicB's topic deletion before topicA's deletion completes.
+    // Since topicA's delete-topic ZK node still exists, context.queueTopicDeletion will be called with Set(topicA, topicB)
+    context.queueTopicDeletion(Set(topicA, topicB))
+    assertEquals(0, context.preferredReplicaImbalanceCount)
+  }
 }