You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2016/11/29 20:17:49 UTC

kafka git commit: KAFKA-4415; Reduce time to create and send UpdateMetadataRequest

Repository: kafka
Updated Branches:
  refs/heads/trunk b11ead580 -> f37dab76f


KAFKA-4415; Reduce time to create and send UpdateMetadataRequest

Author: Dong Lin <li...@gmail.com>

Reviewers: Jiangjie Qin <be...@gmail.com>

Closes #2169 from lindong28/KAFKA-4415-2nd


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f37dab76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f37dab76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f37dab76

Branch: refs/heads/trunk
Commit: f37dab76f494e313eafc4f64d0eeaed82ddad0f7
Parents: b11ead5
Author: Dong Lin <li...@gmail.com>
Authored: Tue Nov 29 12:17:27 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Tue Nov 29 12:17:27 2016 -0800

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   | 94 ++++++++++----------
 1 file changed, 45 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f37dab76/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 0e17688..40071b2 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -248,7 +248,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
   val controllerId: Int = controller.config.brokerId
   val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
   val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
-  val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
+  val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
+  val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, PartitionStateInfo]
   private val stateChangeLogger = KafkaController.stateChangeLogger
 
   def newBatch() {
@@ -259,15 +260,17 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
     if (stopReplicaRequestMap.nonEmpty)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
         "new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
-    if (updateMetadataRequestMap.nonEmpty)
+    if (updateMetadataRequestBrokerSet.nonEmpty)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
-        "new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
+        "new one. Some UpdateMetadata state changes to brokers %s with partition info %s might be lost ".format(
+          updateMetadataRequestBrokerSet.toString(), updateMetadataRequestPartitionInfoMap.toString()))
   }
 
   def clear() {
     leaderAndIsrRequestMap.clear()
     stopReplicaRequestMap.clear()
-    updateMetadataRequestMap.clear()
+    updateMetadataRequestBrokerSet.clear()
+    updateMetadataRequestPartitionInfoMap.clear()
   }
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
@@ -302,7 +305,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
                                          partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
                                          callback: AbstractResponse => Unit = null) {
-    def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
+    def updateMetadataRequestPartitionInfo(partition: TopicAndPartition, beingDeleted: Boolean) {
       val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
       leaderIsrAndControllerEpochOpt match {
         case Some(leaderIsrAndControllerEpoch) =>
@@ -313,10 +316,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
           } else {
             PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
           }
-          brokerIds.filter(b => b >= 0).foreach { brokerId =>
-            updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
-            updateMetadataRequestMap(brokerId).put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
-          }
+          updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
         case None =>
           info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
       }
@@ -332,14 +332,10 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
       else
         givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
     }
-    if (filteredPartitions.isEmpty)
-      brokerIds.filter(b => b >= 0).foreach { brokerId =>
-        updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
-      }
-    else
-      filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))
 
-    controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
+    updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
+    filteredPartitions.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = false))
+    controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
   }
 
   def sendRequestsToBrokers(controllerEpoch: Int) {
@@ -360,50 +356,50 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
           val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
           val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
             leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
-            partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
-          )
+            partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
           topicPartition -> partitionState
         }
         val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
         controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)
       }
       leaderAndIsrRequestMap.clear()
-      updateMetadataRequestMap.foreach { case (broker, partitionStateInfos) =>
 
-        partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
-          "to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
-          broker, p._1)))
-        val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
-          val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
-          val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
-            leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
-            partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
-          )
-          topicPartition -> partitionState
-        }
+      updateMetadataRequestPartitionInfoMap.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
+        "to brokers %s for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
+        updateMetadataRequestBrokerSet.toString(), p._1)))
+      val partitionStates = updateMetadataRequestPartitionInfoMap.map { case (topicPartition, partitionStateInfo) =>
+        val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
+        val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
+          leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
+          partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
+        topicPartition -> partitionState
+      }
 
-        val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Short
-                      else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
-                      else 0: Short
+      val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Short
+                    else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
+                    else 0: Short
 
-        val updateMetadataRequest =
-          if (version == 0) {
-            val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
-            new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
-          }
-          else {
-            val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
-              val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
-                securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
-              }
-              new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
+      val updateMetadataRequest =
+        if (version == 0) {
+          val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
+          new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
+        }
+        else {
+          val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
+            val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
+              securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
             }
-            new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
+            new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
           }
+          new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
+        }
 
+      updateMetadataRequestBrokerSet.foreach {broker =>
         controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)
       }
-      updateMetadataRequestMap.clear()
+      updateMetadataRequestBrokerSet.clear()
+      updateMetadataRequestPartitionInfoMap.clear()
+
       stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
         val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
         val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
@@ -433,9 +429,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
           error("Haven't been able to send leader and isr requests, current state of " +
               s"the map is $leaderAndIsrRequestMap. Exception message: $e")
         }
-        if (updateMetadataRequestMap.nonEmpty) {
-          error("Haven't been able to send metadata update requests, current state of " +
-              s"the map is $updateMetadataRequestMap. Exception message: $e")
+        if (updateMetadataRequestBrokerSet.nonEmpty) {
+          error(s"Haven't been able to send metadata update requests to brokers $updateMetadataRequestBrokerSet, " +
+                s"current state of the partition info is $updateMetadataRequestPartitionInfoMap. Exception message: $e")
         }
         if (stopReplicaRequestMap.nonEmpty) {
           error("Haven't been able to send stop replica requests, current state of " +