You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2016/11/29 18:03:35 UTC

kafka git commit: KAFKA-4443; Controller should send UpdateMetadataRequest prior to LeaderAndIsrRequest during failover

Repository: kafka
Updated Branches:
  refs/heads/trunk 6f0cbe721 -> 7ed3768fb


KAFKA-4443; Controller should send UpdateMetadataRequest prior to LeaderAndIsrRequest during failover

Author: Dong Lin <li...@gmail.com>

Reviewers: Jiangjie Qin <be...@gmail.com>, Jun Rao <ju...@gmail.com>

Closes #2168 from lindong28/KAFKA-4443


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7ed3768f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7ed3768f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7ed3768f

Branch: refs/heads/trunk
Commit: 7ed3768fb6e16ec97815e1676d27833569ec2a98
Parents: 6f0cbe7
Author: Dong Lin <li...@gmail.com>
Authored: Tue Nov 29 10:03:17 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Tue Nov 29 10:03:17 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/controller/KafkaController.scala | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7ed3768f/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 2a6f61c..7ec38ee 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -327,6 +327,10 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
       partitionStateMachine.registerListeners()
       replicaStateMachine.registerListeners()
       initializeControllerContext()
+      // We need to send MetadataUpdateRequest after controller context is initialized and before state machines are started.
+      // This is because broker needs to receive the list of live brokers from MetadataUpdateRequest first in order to process
+      // any LeaderAndIsrRequest that is generated by replicaStateMachine.startup() and partitionStateMachine.startup().
+      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
       replicaStateMachine.startup()
       partitionStateMachine.startup()
       // register the partition change listeners for all existing topics on failover
@@ -335,7 +339,6 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
       maybeTriggerPartitionReassignment()
       maybeTriggerPreferredReplicaElection()
       /* send partition leadership info to all live brokers */
-      sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
       if (config.autoLeaderRebalanceEnable) {
         info("starting the partition rebalance scheduler")
         autoRebalanceScheduler.startup()