You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/05/17 22:40:30 UTC

[kafka] branch trunk updated: MINOR: Remove redundant metric reset in KafkaController (#12158)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f36de0744b9 MINOR: Remove redundant metric reset in KafkaController (#12158)
f36de0744b9 is described below

commit f36de0744b915335de6b636e6bd6b5f1276f34f6
Author: bozhao12 <10...@users.noreply.github.com>
AuthorDate: Wed May 18 06:40:05 2022 +0800

    MINOR: Remove redundant metric reset in KafkaController (#12158)
    
    The following variables in `KafkaController` are used for metrics:
    ```
        offlinePartitionCount
        preferredReplicaImbalanceCount
        globalTopicCount
        globalPartitionCount
        topicsToDeleteCount
        replicasToDeleteCount
        ineligibleTopicsToDeleteCount
        ineligibleReplicasToDeleteCount
    ```
    When the controller goes from active to non-active, these variables will be reset to 0. Currently, this is done explicitly in in `KafkaController.onControllerResignation()` and also after every loop iteration in `KafkaController.updateMetrics()` .
    The first of these is redundant and can be removed. This patch fixes this and also simplifies `updateMetrics`.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/controller/KafkaController.scala   | 76 +++++++++-------------
 1 file changed, 31 insertions(+), 45 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 45c22435e68..f4a8569b803 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -468,14 +468,6 @@ class KafkaController(val config: KafkaConfig,
 
     // shutdown leader rebalance scheduler
     kafkaScheduler.shutdown()
-    offlinePartitionCount = 0
-    preferredReplicaImbalanceCount = 0
-    globalTopicCount = 0
-    globalPartitionCount = 0
-    topicsToDeleteCount = 0
-    replicasToDeleteCount = 0
-    ineligibleTopicsToDeleteCount = 0
-    ineligibleReplicasToDeleteCount = 0
 
     // stop token expiry check scheduler
     if (tokenCleanScheduler.isStarted)
@@ -1435,43 +1427,37 @@ class KafkaController(val config: KafkaConfig,
   }
 
   private def updateMetrics(): Unit = {
-    offlinePartitionCount =
-      if (!isActive) {
-        0
-      } else {
-        controllerContext.offlinePartitionCount
-      }
-
-    preferredReplicaImbalanceCount =
-      if (!isActive) {
-        0
-      } else {
-        controllerContext.preferredReplicaImbalanceCount
-      }
-
-    globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size
-
-    globalPartitionCount = if (!isActive) 0 else controllerContext.partitionWithLeadersCount
-
-    topicsToDeleteCount = if (!isActive) 0 else controllerContext.topicsToBeDeleted.size
-
-    replicasToDeleteCount = if (!isActive) 0 else controllerContext.topicsToBeDeleted.map { topic =>
-      // For each enqueued topic, count the number of replicas that are not yet deleted
-      controllerContext.replicasForTopic(topic).count { replica =>
-        controllerContext.replicaState(replica) != ReplicaDeletionSuccessful
-      }
-    }.sum
-
-    ineligibleTopicsToDeleteCount = if (!isActive) 0 else controllerContext.topicsIneligibleForDeletion.size
-
-    ineligibleReplicasToDeleteCount = if (!isActive) 0 else controllerContext.topicsToBeDeleted.map { topic =>
-      // For each enqueued topic, count the number of replicas that are ineligible
-      controllerContext.replicasForTopic(topic).count { replica =>
-        controllerContext.replicaState(replica) == ReplicaDeletionIneligible
-      }
-    }.sum
-
-    activeBrokerCount = if (isActive) controllerContext.liveOrShuttingDownBrokerIds.size else 0
+    if (isActive) {
+      offlinePartitionCount = controllerContext.offlinePartitionCount
+      preferredReplicaImbalanceCount = controllerContext.preferredReplicaImbalanceCount
+      globalTopicCount = controllerContext.allTopics.size
+      globalPartitionCount = controllerContext.partitionWithLeadersCount
+      topicsToDeleteCount = controllerContext.topicsToBeDeleted.size
+      replicasToDeleteCount = controllerContext.topicsToBeDeleted.map { topic =>
+        // For each enqueued topic, count the number of replicas that are not yet deleted
+        controllerContext.replicasForTopic(topic).count { replica =>
+          controllerContext.replicaState(replica) != ReplicaDeletionSuccessful
+        }
+      }.sum
+      ineligibleTopicsToDeleteCount = controllerContext.topicsIneligibleForDeletion.size
+      ineligibleReplicasToDeleteCount = controllerContext.topicsToBeDeleted.map { topic =>
+        // For each enqueued topic, count the number of replicas that are ineligible
+        controllerContext.replicasForTopic(topic).count { replica =>
+          controllerContext.replicaState(replica) == ReplicaDeletionIneligible
+        }
+      }.sum
+      activeBrokerCount = controllerContext.liveOrShuttingDownBrokerIds.size
+    } else {
+      offlinePartitionCount = 0
+      preferredReplicaImbalanceCount = 0
+      globalTopicCount = 0
+      globalPartitionCount = 0
+      topicsToDeleteCount = 0
+      replicasToDeleteCount = 0
+      ineligibleTopicsToDeleteCount = 0
+      ineligibleReplicasToDeleteCount = 0
+      activeBrokerCount = 0
+    }
   }
 
   // visible for testing