You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cmccabe (via GitHub)" <gi...@apache.org> on 2023/04/18 18:56:49 UTC

[GitHub] [kafka] cmccabe commented on a diff in pull request #13606: KAFKA-14918 Only send controller RPCs to migrating ZK brokers

cmccabe commented on code in PR #13606:
URL: https://github.com/apache/kafka/pull/13606#discussion_r1170454667


##########
core/src/main/scala/kafka/migration/MigrationPropagator.scala:
##########
@@ -70,14 +70,24 @@ class MigrationPropagator(
 
   override def publishMetadata(image: MetadataImage): Unit = {
     val oldImage = _image
-    val addedBrokers = new util.HashSet[Integer](image.cluster().brokers().keySet())
-    addedBrokers.removeAll(oldImage.cluster().brokers().keySet())
-    val removedBrokers = new util.HashSet[Integer](oldImage.cluster().brokers().keySet())
-    removedBrokers.removeAll(image.cluster().brokers().keySet())
-
-    removedBrokers.asScala.foreach(id => channelManager.removeBroker(id))
-    addedBrokers.asScala.foreach(id =>
-      channelManager.addBroker(Broker.fromBrokerRegistration(image.cluster().broker(id))))
+    val prevBrokers = oldImage.cluster().brokers().values().asScala
+      .filter(_.isMigratingZkBroker)
+      .filterNot(_.fenced)
+      .map(Broker.fromBrokerRegistration)
+      .toSet
+
+    val aliveBrokers = image.cluster().brokers().values().asScala
+      .filter(_.isMigratingZkBroker)
+      .filterNot(_.fenced)
+      .map(Broker.fromBrokerRegistration)
+      .toSet
+
+    val addedBrokers = aliveBrokers -- prevBrokers
+    val removedBrokers = prevBrokers -- aliveBrokers
+
+    stateChangeLogger.logger.debug(s"Adding brokers $addedBrokers, removing brokers $removedBrokers.")

Review Comment:
   can we make this INFO and only do it if addedBrokers or removedBrokers is non-empty



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org