You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2016/11/29 20:17:49 UTC
kafka git commit: KAFKA-4415;
Reduce time to create and send UpdateMetadataRequest
Repository: kafka
Updated Branches:
refs/heads/trunk b11ead580 -> f37dab76f
KAFKA-4415; Reduce time to create and send UpdateMetadataRequest
Author: Dong Lin <li...@gmail.com>
Reviewers: Jiangjie Qin <be...@gmail.com>
Closes #2169 from lindong28/KAFKA-4415-2nd
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f37dab76
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f37dab76
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f37dab76
Branch: refs/heads/trunk
Commit: f37dab76f494e313eafc4f64d0eeaed82ddad0f7
Parents: b11ead5
Author: Dong Lin <li...@gmail.com>
Authored: Tue Nov 29 12:17:27 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Tue Nov 29 12:17:27 2016 -0800
----------------------------------------------------------------------
.../controller/ControllerChannelManager.scala | 94 ++++++++++----------
1 file changed, 45 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f37dab76/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 0e17688..40071b2 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -248,7 +248,8 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
val controllerId: Int = controller.config.brokerId
val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
- val updateMetadataRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
+ val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
+ val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, PartitionStateInfo]
private val stateChangeLogger = KafkaController.stateChangeLogger
def newBatch() {
@@ -259,15 +260,17 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
if (stopReplicaRequestMap.nonEmpty)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some StopReplica state changes %s might be lost ".format(stopReplicaRequestMap.toString()))
- if (updateMetadataRequestMap.nonEmpty)
+ if (updateMetadataRequestBrokerSet.nonEmpty)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
- "new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
+ "new one. Some UpdateMetadata state changes to brokers %s with partition info %s might be lost ".format(
+ updateMetadataRequestBrokerSet.toString(), updateMetadataRequestPartitionInfoMap.toString()))
}
def clear() {
leaderAndIsrRequestMap.clear()
stopReplicaRequestMap.clear()
- updateMetadataRequestMap.clear()
+ updateMetadataRequestBrokerSet.clear()
+ updateMetadataRequestPartitionInfoMap.clear()
}
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
@@ -302,7 +305,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
partitions: collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition],
callback: AbstractResponse => Unit = null) {
- def updateMetadataRequestMapFor(partition: TopicAndPartition, beingDeleted: Boolean) {
+ def updateMetadataRequestPartitionInfo(partition: TopicAndPartition, beingDeleted: Boolean) {
val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
@@ -313,10 +316,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
} else {
PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
}
- brokerIds.filter(b => b >= 0).foreach { brokerId =>
- updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
- updateMetadataRequestMap(brokerId).put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
- }
+ updateMetadataRequestPartitionInfoMap.put(new TopicPartition(partition.topic, partition.partition), partitionStateInfo)
case None =>
info("Leader not yet assigned for partition %s. Skip sending UpdateMetadataRequest.".format(partition))
}
@@ -332,14 +332,10 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
else
givenPartitions -- controller.deleteTopicManager.partitionsToBeDeleted
}
- if (filteredPartitions.isEmpty)
- brokerIds.filter(b => b >= 0).foreach { brokerId =>
- updateMetadataRequestMap.getOrElseUpdate(brokerId, mutable.Map.empty[TopicPartition, PartitionStateInfo])
- }
- else
- filteredPartitions.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = false))
- controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestMapFor(partition, beingDeleted = true))
+ updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
+ filteredPartitions.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = false))
+ controller.deleteTopicManager.partitionsToBeDeleted.foreach(partition => updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
}
def sendRequestsToBrokers(controllerEpoch: Int) {
@@ -360,50 +356,50 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
- partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
- )
+ partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
topicPartition -> partitionState
}
val leaderAndIsrRequest = new LeaderAndIsrRequest(controllerId, controllerEpoch, partitionStates.asJava, leaders.asJava)
controller.sendRequest(broker, ApiKeys.LEADER_AND_ISR, None, leaderAndIsrRequest, null)
}
leaderAndIsrRequestMap.clear()
- updateMetadataRequestMap.foreach { case (broker, partitionStateInfos) =>
- partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
- "to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
- broker, p._1)))
- val partitionStates = partitionStateInfos.map { case (topicPartition, partitionStateInfo) =>
- val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
- val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
- leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
- partitionStateInfo.allReplicas.map(Integer.valueOf).asJava
- )
- topicPartition -> partitionState
- }
+ updateMetadataRequestPartitionInfoMap.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s " +
+ "to brokers %s for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
+ updateMetadataRequestBrokerSet.toString(), p._1)))
+ val partitionStates = updateMetadataRequestPartitionInfoMap.map { case (topicPartition, partitionStateInfo) =>
+ val LeaderIsrAndControllerEpoch(leaderIsr, controllerEpoch) = partitionStateInfo.leaderIsrAndControllerEpoch
+ val partitionState = new requests.PartitionState(controllerEpoch, leaderIsr.leader,
+ leaderIsr.leaderEpoch, leaderIsr.isr.map(Integer.valueOf).asJava, leaderIsr.zkVersion,
+ partitionStateInfo.allReplicas.map(Integer.valueOf).asJava)
+ topicPartition -> partitionState
+ }
- val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Short
- else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
- else 0: Short
+ val version = if (controller.config.interBrokerProtocolVersion >= KAFKA_0_10_0_IV1) 2: Short
+ else if (controller.config.interBrokerProtocolVersion >= KAFKA_0_9_0) 1: Short
+ else 0: Short
- val updateMetadataRequest =
- if (version == 0) {
- val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
- new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
- }
- else {
- val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
- val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
- securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
- }
- new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
+ val updateMetadataRequest =
+ if (version == 0) {
+ val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map(_.getNode(SecurityProtocol.PLAINTEXT))
+ new UpdateMetadataRequest(controllerId, controllerEpoch, liveBrokers.asJava, partitionStates.asJava)
+ }
+ else {
+ val liveBrokers = controllerContext.liveOrShuttingDownBrokers.map { broker =>
+ val endPoints = broker.endPoints.map { case (securityProtocol, endPoint) =>
+ securityProtocol -> new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port)
}
- new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
+ new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
}
+ new UpdateMetadataRequest(version, controllerId, controllerEpoch, partitionStates.asJava, liveBrokers.asJava)
+ }
+ updateMetadataRequestBrokerSet.foreach {broker =>
controller.sendRequest(broker, ApiKeys.UPDATE_METADATA_KEY, Some(version), updateMetadataRequest, null)
}
- updateMetadataRequestMap.clear()
+ updateMetadataRequestBrokerSet.clear()
+ updateMetadataRequestPartitionInfoMap.clear()
+
stopReplicaRequestMap.foreach { case (broker, replicaInfoList) =>
val stopReplicaWithDelete = replicaInfoList.filter(_.deletePartition).map(_.replica).toSet
val stopReplicaWithoutDelete = replicaInfoList.filterNot(_.deletePartition).map(_.replica).toSet
@@ -433,9 +429,9 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging
error("Haven't been able to send leader and isr requests, current state of " +
s"the map is $leaderAndIsrRequestMap. Exception message: $e")
}
- if (updateMetadataRequestMap.nonEmpty) {
- error("Haven't been able to send metadata update requests, current state of " +
- s"the map is $updateMetadataRequestMap. Exception message: $e")
+ if (updateMetadataRequestBrokerSet.nonEmpty) {
+ error(s"Haven't been able to send metadata update requests to brokers $updateMetadataRequestBrokerSet, " +
+ s"current state of the partition info is $updateMetadataRequestPartitionInfoMap. Exception message: $e")
}
if (stopReplicaRequestMap.nonEmpty) {
error("Haven't been able to send stop replica requests, current state of " +