You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2016/11/29 18:03:35 UTC
kafka git commit: KAFKA-4443;
Controller should send UpdateMetadataRequest prior to
LeaderAndIsrRequest during failover
Repository: kafka
Updated Branches:
refs/heads/trunk 6f0cbe721 -> 7ed3768fb
KAFKA-4443; Controller should send UpdateMetadataRequest prior to LeaderAndIsrRequest during failover
Author: Dong Lin <li...@gmail.com>
Reviewers: Jiangjie Qin <be...@gmail.com>, Jun Rao <ju...@gmail.com>
Closes #2168 from lindong28/KAFKA-4443
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7ed3768f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7ed3768f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7ed3768f
Branch: refs/heads/trunk
Commit: 7ed3768fb6e16ec97815e1676d27833569ec2a98
Parents: 6f0cbe7
Author: Dong Lin <li...@gmail.com>
Authored: Tue Nov 29 10:03:17 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Tue Nov 29 10:03:17 2016 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/controller/KafkaController.scala | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7ed3768f/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 2a6f61c..7ec38ee 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -327,6 +327,10 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
initializeControllerContext()
+ // We need to send MetadataUpdateRequest after controller context is initialized and before state machines are started.
+ // This is because broker needs to receive the list of live brokers from MetadataUpdateRequest first in order to process
+ // any LeaderAndIsrRequest that is generated by replicaStateMachine.startup() and partitionStateMachine.startup().
+ sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
replicaStateMachine.startup()
partitionStateMachine.startup()
// register the partition change listeners for all existing topics on failover
@@ -335,7 +339,6 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat
maybeTriggerPartitionReassignment()
maybeTriggerPreferredReplicaElection()
/* send partition leadership info to all live brokers */
- sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()