You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/27 03:31:15 UTC

[GitHub] [kafka] dajac opened a new pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

dajac opened a new pull request #8724:
URL: https://github.com/apache/kafka/pull/8724


   TODO
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r432986275



##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,54 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
   }
 
+  def putLeadershipInfo(partition: TopicPartition, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+    val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    val replicaAssignment = partitionFullReplicaAssignment(partition)
+    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,
+      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition,
+                                                    oldReplicaAssignment: Option[ReplicaAssignment],
+                                                    oldLeadershipInfo: Option[LeaderIsrAndControllerEpoch],
+                                                    newReplicaAssignment: Option[ReplicaAssignment],

Review comment:
       nit: I think this exact argument doesn't need to be an Option




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r435076849



##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,90 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
   }
 
+  def putPartitionLeadershipInfo(partition: TopicPartition,
+                                 leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+    val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    val replicaAssignment = partitionFullReplicaAssignment(partition)
+    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,
+      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  def partitionLeadershipInfo(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
+    partitionLeadershipInfo.get(partition)
+  }
+
+  def partitionsLeadershipInfo(): Iterable[(TopicPartition, LeaderIsrAndControllerEpoch)] = {
+    partitionLeadershipInfo
+  }
+
+  def partitionsWithLeaders(): Set[TopicPartition] = {
+    partitionLeadershipInfo.keySet
+  }
+
+  def partitionsWithoutLeaders(): Set[TopicPartition] = {
+    partitionLeadershipInfo.filter { case (topicPartition, leaderIsrAndControllerEpoch) =>
+      !isReplicaOnline(leaderIsrAndControllerEpoch.leaderAndIsr.leader, topicPartition) &&
+        !isTopicQueuedUpForDeletion(topicPartition.topic)
+    }.keySet
+  }
+
+  def partitionLeadsOnBroker(brokerId: Int): Set[TopicPartition] = {

Review comment:
       That makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r434262971



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1254,23 +1254,7 @@ class KafkaController(val config: KafkaConfig,
       if (!isActive) {
         0
       } else {
-        controllerContext.allPartitions.count { topicPartition =>

Review comment:
       Ouch. So the main issue is that we were doing this full iteration overall partitions on _every_ controller event.

##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,54 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
   }
 
+  def putLeadershipInfo(partition: TopicPartition, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+    val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    val replicaAssignment = partitionFullReplicaAssignment(partition)
+    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,
+      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition,
+                                                    oldReplicaAssignment: Option[ReplicaAssignment],
+                                                    oldLeadershipInfo: Option[LeaderIsrAndControllerEpoch],
+                                                    newReplicaAssignment: Option[ReplicaAssignment],
+                                                    newLeadershipInfo: Option[LeaderIsrAndControllerEpoch]): Unit = {
+    if (!isTopicQueuedUpForDeletion(partition.topic)) {
+      oldReplicaAssignment.foreach { replicaAssignment =>
+        oldLeadershipInfo.foreach { leadershipInfo =>
+          if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+            preferredReplicaImbalanceCount -= 1
+        }
+      }
+
+      newReplicaAssignment.foreach { replicaAssignment =>
+        newLeadershipInfo.foreach { leadershipInfo =>
+          if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+            preferredReplicaImbalanceCount += 1
+        }
+      }
+    }
+  }
+
+  private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = {
+    partitionAssignments.getOrElse(topic, mutable.Map.empty).foreach { case (partition, replicaAssignment) =>
+      partitionLeadershipInfo.get(new TopicPartition(topic, partition)).foreach { leadershipInfo =>
+        if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+          preferredReplicaImbalanceCount -= 1
+      }
+    }
+  }
+
+  private def isReplicaImbalance(replicaAssignment: ReplicaAssignment,

Review comment:
       What about `hasPreferredLeader`?

##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,54 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
   }
 
+  def putLeadershipInfo(partition: TopicPartition, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {

Review comment:
       Wonder if we should consider making `partitionLeadershipInfo` private so that we do not mistakenly invoke `put` in the future directly.

##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,54 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
   }
 
+  def putLeadershipInfo(partition: TopicPartition, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+    val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    val replicaAssignment = partitionFullReplicaAssignment(partition)
+    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,
+      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition,
+                                                    oldReplicaAssignment: Option[ReplicaAssignment],
+                                                    oldLeadershipInfo: Option[LeaderIsrAndControllerEpoch],
+                                                    newReplicaAssignment: Option[ReplicaAssignment],
+                                                    newLeadershipInfo: Option[LeaderIsrAndControllerEpoch]): Unit = {
+    if (!isTopicQueuedUpForDeletion(partition.topic)) {
+      oldReplicaAssignment.foreach { replicaAssignment =>
+        oldLeadershipInfo.foreach { leadershipInfo =>
+          if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+            preferredReplicaImbalanceCount -= 1
+        }
+      }
+
+      newReplicaAssignment.foreach { replicaAssignment =>
+        newLeadershipInfo.foreach { leadershipInfo =>
+          if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+            preferredReplicaImbalanceCount += 1
+        }
+      }
+    }
+  }
+
+  private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = {
+    partitionAssignments.getOrElse(topic, mutable.Map.empty).foreach { case (partition, replicaAssignment) =>
+      partitionLeadershipInfo.get(new TopicPartition(topic, partition)).foreach { leadershipInfo =>
+        if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+          preferredReplicaImbalanceCount -= 1
+      }
+    }
+  }
+
+  private def isReplicaImbalance(replicaAssignment: ReplicaAssignment,
+                                 leadershipInfo: LeaderIsrAndControllerEpoch): Boolean = {
+    val preferredReplica = replicaAssignment.replicas.head
+    if (replicaAssignment.isBeingReassigned && replicaAssignment.addingReplicas.contains(preferredReplica))
+      // reassigning partitions are not counted as imbalanced until the new replica joins the ISR (completes reassignment)

Review comment:
       Not something to change here, but it's a little curious that we only do this when a reassignment is in progress. I'm not sure it's useful to distinguish that case as opposed to a broker that is catching up after a restart. If the preferred leader is not in the ISR, then we can't elect it anyway. Seems like it might be more useful if this metric captured the "eligible imbalance". 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #8724:
URL: https://github.com/apache/kafka/pull/8724


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r435085001



##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,90 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
   }
 
+  def putPartitionLeadershipInfo(partition: TopicPartition,
+                                 leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+    val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    val replicaAssignment = partitionFullReplicaAssignment(partition)
+    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,
+      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  def partitionLeadershipInfo(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
+    partitionLeadershipInfo.get(partition)
+  }
+
+  def partitionsLeadershipInfo(): Iterable[(TopicPartition, LeaderIsrAndControllerEpoch)] = {
+    partitionLeadershipInfo
+  }
+
+  def partitionsWithLeaders(): Set[TopicPartition] = {
+    partitionLeadershipInfo.keySet

Review comment:
       Right. I did not pay attention to this detail but that makes total sense to use a better name here. Let's go with `partitionsWithOfflineLeaders`.
   We can indeed exclude topics which are being queued for deletion. It is already done in `partitionsWithOfflineLeaders` but I have added it in `partitionsWithLeaders`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-639189300


   I will go ahead and merge this. I build locally with JDK11 and Scala 2.13.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r434361213



##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,54 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
   }
 
+  def putLeadershipInfo(partition: TopicPartition, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+    val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    val replicaAssignment = partitionFullReplicaAssignment(partition)
+    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,
+      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition,
+                                                    oldReplicaAssignment: Option[ReplicaAssignment],
+                                                    oldLeadershipInfo: Option[LeaderIsrAndControllerEpoch],
+                                                    newReplicaAssignment: Option[ReplicaAssignment],
+                                                    newLeadershipInfo: Option[LeaderIsrAndControllerEpoch]): Unit = {
+    if (!isTopicQueuedUpForDeletion(partition.topic)) {
+      oldReplicaAssignment.foreach { replicaAssignment =>
+        oldLeadershipInfo.foreach { leadershipInfo =>
+          if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+            preferredReplicaImbalanceCount -= 1
+        }
+      }
+
+      newReplicaAssignment.foreach { replicaAssignment =>
+        newLeadershipInfo.foreach { leadershipInfo =>
+          if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+            preferredReplicaImbalanceCount += 1
+        }
+      }
+    }
+  }
+
+  private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = {
+    partitionAssignments.getOrElse(topic, mutable.Map.empty).foreach { case (partition, replicaAssignment) =>
+      partitionLeadershipInfo.get(new TopicPartition(topic, partition)).foreach { leadershipInfo =>
+        if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+          preferredReplicaImbalanceCount -= 1
+      }
+    }
+  }
+
+  private def isReplicaImbalance(replicaAssignment: ReplicaAssignment,
+                                 leadershipInfo: LeaderIsrAndControllerEpoch): Boolean = {
+    val preferredReplica = replicaAssignment.replicas.head
+    if (replicaAssignment.isBeingReassigned && replicaAssignment.addingReplicas.contains(preferredReplica))
+      // reassigning partitions are not counted as imbalanced until the new replica joins the ISR (completes reassignment)

Review comment:
       I agree with you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r434360661



##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,54 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
   }
 
+  def putLeadershipInfo(partition: TopicPartition, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+    val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    val replicaAssignment = partitionFullReplicaAssignment(partition)
+    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,
+      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition,
+                                                    oldReplicaAssignment: Option[ReplicaAssignment],
+                                                    oldLeadershipInfo: Option[LeaderIsrAndControllerEpoch],
+                                                    newReplicaAssignment: Option[ReplicaAssignment],

Review comment:
       Actually, in `updatePartitionFullReplicaAssignment`, `partitionLeadershipInfo.get(topicPartition)` returns an `Option` so it is more convenient to keep it as an `Option`. Ok for you?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-639033169


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-634134203


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-638305368


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-638306026


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r434262971



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1254,23 +1254,7 @@ class KafkaController(val config: KafkaConfig,
       if (!isActive) {
         0
       } else {
-        controllerContext.allPartitions.count { topicPartition =>

Review comment:
       Ouch. So the main issue is that we were doing this full iteration over all partitions on _every_ controller event.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r434360795



##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,54 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
   }
 
+  def putLeadershipInfo(partition: TopicPartition, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+    val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    val replicaAssignment = partitionFullReplicaAssignment(partition)
+    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,
+      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  private def updatePreferredReplicaImbalanceMetric(partition: TopicPartition,
+                                                    oldReplicaAssignment: Option[ReplicaAssignment],
+                                                    oldLeadershipInfo: Option[LeaderIsrAndControllerEpoch],
+                                                    newReplicaAssignment: Option[ReplicaAssignment],
+                                                    newLeadershipInfo: Option[LeaderIsrAndControllerEpoch]): Unit = {
+    if (!isTopicQueuedUpForDeletion(partition.topic)) {
+      oldReplicaAssignment.foreach { replicaAssignment =>
+        oldLeadershipInfo.foreach { leadershipInfo =>
+          if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+            preferredReplicaImbalanceCount -= 1
+        }
+      }
+
+      newReplicaAssignment.foreach { replicaAssignment =>
+        newLeadershipInfo.foreach { leadershipInfo =>
+          if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+            preferredReplicaImbalanceCount += 1
+        }
+      }
+    }
+  }
+
+  private def cleanPreferredReplicaImbalanceMetric(topic: String): Unit = {
+    partitionAssignments.getOrElse(topic, mutable.Map.empty).foreach { case (partition, replicaAssignment) =>
+      partitionLeadershipInfo.get(new TopicPartition(topic, partition)).foreach { leadershipInfo =>
+        if (isReplicaImbalance(replicaAssignment, leadershipInfo))
+          preferredReplicaImbalanceCount -= 1
+      }
+    }
+  }
+
+  private def isReplicaImbalance(replicaAssignment: ReplicaAssignment,

Review comment:
       Yes, that is fine. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-638711111


   @hachikuji Thanks for the review. I have updated the PR based on your feedback.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-638962844


   ok to test


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#issuecomment-638305947


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #8724: KAFKA-10040; Make computing the PreferredReplicaImbalanceCount metric more efficient

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #8724:
URL: https://github.com/apache/kafka/pull/8724#discussion_r434702023



##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,90 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
   }
 
+  def putPartitionLeadershipInfo(partition: TopicPartition,
+                                 leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+    val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    val replicaAssignment = partitionFullReplicaAssignment(partition)
+    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,
+      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  def partitionLeadershipInfo(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
+    partitionLeadershipInfo.get(partition)
+  }
+
+  def partitionsLeadershipInfo(): Iterable[(TopicPartition, LeaderIsrAndControllerEpoch)] = {
+    partitionLeadershipInfo
+  }
+
+  def partitionsWithLeaders(): Set[TopicPartition] = {
+    partitionLeadershipInfo.keySet
+  }
+
+  def partitionsWithoutLeaders(): Set[TopicPartition] = {
+    partitionLeadershipInfo.filter { case (topicPartition, leaderIsrAndControllerEpoch) =>
+      !isReplicaOnline(leaderIsrAndControllerEpoch.leaderAndIsr.leader, topicPartition) &&
+        !isTopicQueuedUpForDeletion(topicPartition.topic)
+    }.keySet
+  }
+
+  def partitionLeadsOnBroker(brokerId: Int): Set[TopicPartition] = {

Review comment:
       nit: `partitionLeadersOnBroker`?

##########
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##########
@@ -391,6 +404,90 @@ class ControllerContext {
     partitionsForTopic(topic).filter { partition => states.contains(partitionState(partition)) }.toSet
   }
 
+  def putPartitionLeadershipInfo(partition: TopicPartition,
+                                 leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Unit = {
+    val previous = partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
+    val replicaAssignment = partitionFullReplicaAssignment(partition)
+    updatePreferredReplicaImbalanceMetric(partition, Some(replicaAssignment), previous,
+      Some(replicaAssignment), Some(leaderIsrAndControllerEpoch))
+  }
+
+  def partitionLeadershipInfo(partition: TopicPartition): Option[LeaderIsrAndControllerEpoch] = {
+    partitionLeadershipInfo.get(partition)
+  }
+
+  def partitionsLeadershipInfo(): Iterable[(TopicPartition, LeaderIsrAndControllerEpoch)] = {
+    partitionLeadershipInfo
+  }
+
+  def partitionsWithLeaders(): Set[TopicPartition] = {
+    partitionLeadershipInfo.keySet

Review comment:
       This definition seems inconsistent with `partitionsWithoutLeaders`. I think you're just trying to preserve the existing logic. It might make sense to use a different name to avoid the apparent inconsistency? Maybe we could change `partitionsWithoutLeaders` to be `partitionsWithOfflineLeaders` or something like that. Looking at the caller, it looks like it would be reasonable to exclude topics which are being queued for deletion in both cases, but we could change that separately if you think it is risky.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org