You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/05/23 23:30:01 UTC

kafka git commit: KAFKA-5310; reset ControllerContext during resignation

Repository: kafka
Updated Branches:
  refs/heads/trunk 1bf648331 -> beeddc25d


KAFKA-5310; reset ControllerContext during resignation

This ticket is all about ControllerContext initialization and teardown. The key points are:
1. we should teardown ControllerContext during resignation instead of waiting on election to fix it up. A heapdump shows that the former controller keeps pretty much all of its ControllerContext state laying around.
2. we don't properly teardown/reset ControllerContext.partitionsBeingReassigned. This can cause problems when the former controller becomes re-elected as controller at a later point in time.

Suppose a partition assignment is initially R0. Now suppose a reassignment R1 gets stuck during controller C0 and an admin tries to "undo" R1 (by deleting /admin/partitions_reassigned, deleting /controller, and submitting another reassignment specifying R0). The new controller C1 may succeed with R0. If the controller moves back to C0, it will then reattempt R1 even though that partition reassignment has been cleared from zookeeper prior to shifting the controller back to C0. This results in the actual partition reassignment in zookeeper being unexpectedly changed back to R1.

Author: Onur Karaman <ok...@linkedin.com>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #3122 from onurkaraman/KAFKA-5310


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/beeddc25
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/beeddc25
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/beeddc25

Branch: refs/heads/trunk
Commit: beeddc25d6b443ed344658a2562a162fb03048ef
Parents: 1bf6483
Author: Onur Karaman <ok...@linkedin.com>
Authored: Tue May 23 16:29:55 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue May 23 16:29:55 2017 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 23 ++++++++++++++------
 1 file changed, 16 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/beeddc25/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 41a88d9..69669cd 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -315,14 +315,8 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     replicaStateMachine.shutdown()
     deregisterBrokerChangeListener()
 
-    // shutdown controller channel manager
-    if(controllerContext.controllerChannelManager != null) {
-      controllerContext.controllerChannelManager.shutdown()
-      controllerContext.controllerChannelManager = null
-    }
     // reset controller context
-    controllerContext.epoch=0
-    controllerContext.epochZkVersion=0
+    resetControllerContext()
     brokerState.newState(RunningAsBroker)
 
     info("Broker %d resigned as the controller".format(config.brokerId))
@@ -690,6 +684,21 @@ class KafkaController(val config: KafkaConfig, zkUtils: ZkUtils, val brokerState
     pendingPreferredReplicaElections
   }
 
+  private def resetControllerContext(): Unit = {
+    if(controllerContext.controllerChannelManager != null) {
+      controllerContext.controllerChannelManager.shutdown()
+      controllerContext.controllerChannelManager = null
+    }
+    controllerContext.shuttingDownBrokerIds.clear()
+    controllerContext.epoch = 0
+    controllerContext.epochZkVersion = 0
+    controllerContext.allTopics = Set.empty
+    controllerContext.partitionReplicaAssignment.clear()
+    controllerContext.partitionLeadershipInfo.clear()
+    controllerContext.partitionsBeingReassigned.clear()
+    controllerContext.liveBrokers = Set.empty
+  }
+
   private def initializePartitionReassignment() {
     // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
     val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()