You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/04/18 21:22:38 UTC

[kafka] 01/01: Do not use option type in updateBrokerMetadata

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch KAFKA-8237
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4b85b4837024d5e784b4e2268993d1e1de2a22a5
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Apr 18 14:22:06 2019 -0700

    Do not use option type in updateBrokerMetadata
---
 .../main/scala/kafka/controller/ControllerContext.scala |  5 +++--
 .../main/scala/kafka/controller/KafkaController.scala   | 17 ++++++++++-------
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 2bdfcba..f6f5880 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -116,8 +116,9 @@ class ControllerContext {
     liveBrokerEpochs = liveBrokerEpochs.filterKeys(id => !brokerIds.contains(id))
   }
 
-  def updateBrokerMetadata(oldMetadata: Option[Broker], newMetadata: Option[Broker]): Unit = {
-    liveBrokers = liveBrokers -- oldMetadata ++ newMetadata
+  def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {
+    liveBrokers -= oldMetadata
+    liveBrokers += newMetadata
   }
 
   // getter
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 694af0a..d31d1fc 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1342,13 +1342,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
 
     override def process(): Unit = {
       if (!isActive) return
-      val newMetadata = zkClient.getBroker(brokerId)
-      val oldMetadata = controllerContext.liveBrokers.find(_.id == brokerId)
-      if (newMetadata.nonEmpty && oldMetadata.nonEmpty && newMetadata.map(_.endPoints) != oldMetadata.map(_.endPoints)) {
-        info(s"Updated broker: ${newMetadata.get}")
-
-        controllerContext.updateBrokerMetadata(oldMetadata, newMetadata)
-        onBrokerUpdate(brokerId)
+      val newMetadataOpt = zkClient.getBroker(brokerId)
+      val oldMetadataOpt = controllerContext.liveBrokers.find(_.id == brokerId)
+      if (newMetadataOpt.nonEmpty && oldMetadataOpt.nonEmpty) {
+        val oldMetadata = oldMetadataOpt.get
+        val newMetadata = newMetadataOpt.get
+        if (newMetadata.endPoints != oldMetadata.endPoints) {
+          info(s"Updated broker metadata: $oldMetadata -> $newMetadata")
+          controllerContext.updateBrokerMetadata(oldMetadata, newMetadata)
+          onBrokerUpdate(brokerId)
+        }
       }
     }
   }