You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/05/17 22:40:30 UTC
[kafka] branch trunk updated: MINOR: Remove redundant metric reset in KafkaController (#12158)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f36de0744b9 MINOR: Remove redundant metric reset in KafkaController (#12158)
f36de0744b9 is described below
commit f36de0744b915335de6b636e6bd6b5f1276f34f6
Author: bozhao12 <10...@users.noreply.github.com>
AuthorDate: Wed May 18 06:40:05 2022 +0800
MINOR: Remove redundant metric reset in KafkaController (#12158)
The following variables in `KafkaController` are used for metrics:
```
offlinePartitionCount
preferredReplicaImbalanceCount
globalTopicCount
globalPartitionCount
topicsToDeleteCount
replicasToDeleteCount
ineligibleTopicsToDeleteCount
ineligibleReplicasToDeleteCount
```
When the controller goes from active to non-active, these variables will be reset to 0. Currently, this is done explicitly in in `KafkaController.onControllerResignation()` and also after every loop iteration in `KafkaController.updateMetrics()` .
The first of these is redundant and can be removed. This patch fixes this and also simplifies `updateMetrics`.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../scala/kafka/controller/KafkaController.scala | 76 +++++++++-------------
1 file changed, 31 insertions(+), 45 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 45c22435e68..f4a8569b803 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -468,14 +468,6 @@ class KafkaController(val config: KafkaConfig,
// shutdown leader rebalance scheduler
kafkaScheduler.shutdown()
- offlinePartitionCount = 0
- preferredReplicaImbalanceCount = 0
- globalTopicCount = 0
- globalPartitionCount = 0
- topicsToDeleteCount = 0
- replicasToDeleteCount = 0
- ineligibleTopicsToDeleteCount = 0
- ineligibleReplicasToDeleteCount = 0
// stop token expiry check scheduler
if (tokenCleanScheduler.isStarted)
@@ -1435,43 +1427,37 @@ class KafkaController(val config: KafkaConfig,
}
private def updateMetrics(): Unit = {
- offlinePartitionCount =
- if (!isActive) {
- 0
- } else {
- controllerContext.offlinePartitionCount
- }
-
- preferredReplicaImbalanceCount =
- if (!isActive) {
- 0
- } else {
- controllerContext.preferredReplicaImbalanceCount
- }
-
- globalTopicCount = if (!isActive) 0 else controllerContext.allTopics.size
-
- globalPartitionCount = if (!isActive) 0 else controllerContext.partitionWithLeadersCount
-
- topicsToDeleteCount = if (!isActive) 0 else controllerContext.topicsToBeDeleted.size
-
- replicasToDeleteCount = if (!isActive) 0 else controllerContext.topicsToBeDeleted.map { topic =>
- // For each enqueued topic, count the number of replicas that are not yet deleted
- controllerContext.replicasForTopic(topic).count { replica =>
- controllerContext.replicaState(replica) != ReplicaDeletionSuccessful
- }
- }.sum
-
- ineligibleTopicsToDeleteCount = if (!isActive) 0 else controllerContext.topicsIneligibleForDeletion.size
-
- ineligibleReplicasToDeleteCount = if (!isActive) 0 else controllerContext.topicsToBeDeleted.map { topic =>
- // For each enqueued topic, count the number of replicas that are ineligible
- controllerContext.replicasForTopic(topic).count { replica =>
- controllerContext.replicaState(replica) == ReplicaDeletionIneligible
- }
- }.sum
-
- activeBrokerCount = if (isActive) controllerContext.liveOrShuttingDownBrokerIds.size else 0
+ if (isActive) {
+ offlinePartitionCount = controllerContext.offlinePartitionCount
+ preferredReplicaImbalanceCount = controllerContext.preferredReplicaImbalanceCount
+ globalTopicCount = controllerContext.allTopics.size
+ globalPartitionCount = controllerContext.partitionWithLeadersCount
+ topicsToDeleteCount = controllerContext.topicsToBeDeleted.size
+ replicasToDeleteCount = controllerContext.topicsToBeDeleted.map { topic =>
+ // For each enqueued topic, count the number of replicas that are not yet deleted
+ controllerContext.replicasForTopic(topic).count { replica =>
+ controllerContext.replicaState(replica) != ReplicaDeletionSuccessful
+ }
+ }.sum
+ ineligibleTopicsToDeleteCount = controllerContext.topicsIneligibleForDeletion.size
+ ineligibleReplicasToDeleteCount = controllerContext.topicsToBeDeleted.map { topic =>
+ // For each enqueued topic, count the number of replicas that are ineligible
+ controllerContext.replicasForTopic(topic).count { replica =>
+ controllerContext.replicaState(replica) == ReplicaDeletionIneligible
+ }
+ }.sum
+ activeBrokerCount = controllerContext.liveOrShuttingDownBrokerIds.size
+ } else {
+ offlinePartitionCount = 0
+ preferredReplicaImbalanceCount = 0
+ globalTopicCount = 0
+ globalPartitionCount = 0
+ topicsToDeleteCount = 0
+ replicasToDeleteCount = 0
+ ineligibleTopicsToDeleteCount = 0
+ ineligibleReplicasToDeleteCount = 0
+ activeBrokerCount = 0
+ }
}
// visible for testing