You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/04/18 21:22:38 UTC
[kafka] 01/01: Do not use option type in updateBrokerMetadata
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch KAFKA-8237
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 4b85b4837024d5e784b4e2268993d1e1de2a22a5
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Thu Apr 18 14:22:06 2019 -0700
Do not use option type in updateBrokerMetadata
---
.../main/scala/kafka/controller/ControllerContext.scala | 5 +++--
.../main/scala/kafka/controller/KafkaController.scala | 17 ++++++++++-------
2 files changed, 13 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 2bdfcba..f6f5880 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -116,8 +116,9 @@ class ControllerContext {
liveBrokerEpochs = liveBrokerEpochs.filterKeys(id => !brokerIds.contains(id))
}
- def updateBrokerMetadata(oldMetadata: Option[Broker], newMetadata: Option[Broker]): Unit = {
- liveBrokers = liveBrokers -- oldMetadata ++ newMetadata
+ def updateBrokerMetadata(oldMetadata: Broker, newMetadata: Broker): Unit = {
+ liveBrokers -= oldMetadata
+ liveBrokers += newMetadata
}
// getter
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 694af0a..d31d1fc 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1342,13 +1342,16 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti
override def process(): Unit = {
if (!isActive) return
- val newMetadata = zkClient.getBroker(brokerId)
- val oldMetadata = controllerContext.liveBrokers.find(_.id == brokerId)
- if (newMetadata.nonEmpty && oldMetadata.nonEmpty && newMetadata.map(_.endPoints) != oldMetadata.map(_.endPoints)) {
- info(s"Updated broker: ${newMetadata.get}")
-
- controllerContext.updateBrokerMetadata(oldMetadata, newMetadata)
- onBrokerUpdate(brokerId)
+ val newMetadataOpt = zkClient.getBroker(brokerId)
+ val oldMetadataOpt = controllerContext.liveBrokers.find(_.id == brokerId)
+ if (newMetadataOpt.nonEmpty && oldMetadataOpt.nonEmpty) {
+ val oldMetadata = oldMetadataOpt.get
+ val newMetadata = newMetadataOpt.get
+ if (newMetadata.endPoints != oldMetadata.endPoints) {
+ info(s"Updated broker metadata: $oldMetadata -> $newMetadata")
+ controllerContext.updateBrokerMetadata(oldMetadata, newMetadata)
+ onBrokerUpdate(brokerId)
+ }
}
}
}