You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/08/08 22:50:23 UTC

git commit: kafka-1578; Controller should de-register all listeners upon designation; patched by Guozhang Wang; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 0dc243b92 -> c9894fb6e


kafka-1578; Controller should de-register all listeners upon designation; patched by Guozhang Wang; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9894fb6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9894fb6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9894fb6

Branch: refs/heads/trunk
Commit: c9894fb6ee4d33155bdb669a2e2f64ed9c3f9441
Parents: 0dc243b
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Fri Aug 8 13:50:08 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Aug 8 13:50:08 2014 -0700

----------------------------------------------------------------------
 .../kafka/controller/KafkaController.scala      | 37 ++++++++++++++--
 .../controller/PartitionStateMachine.scala      | 44 ++++++++++++++++----
 .../kafka/controller/ReplicaStateMachine.scala  | 32 +++++++++++---
 3 files changed, 97 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c9894fb6/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a7a21df..8ab4a1b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -169,6 +169,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
   private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
+
+  private val partitionReassignedListener = new PartitionsReassignedListener(this)
+  private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
+
   newGauge(
     "ActiveControllerCount",
     new Gauge[Int] {
@@ -333,19 +337,30 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
    * required to clean up internal controller data structures
    */
   def onControllerResignation() {
+    // de-register listeners
+    deregisterReassignedPartitionsListener()
+    deregisterPreferredReplicaElectionListener()
+
+    // shutdown delete topic manager
     if (deleteTopicManager != null)
       deleteTopicManager.shutdown()
 
     inLock(controllerContext.controllerLock) {
+      // de-register partition ISR listener for on-going partition reassignment task
+      deregisterReassignedPartitionsIsrChangeListeners()
+      // shutdown leader rebalance scheduler
       if (config.autoLeaderRebalanceEnable)
         autoRebalanceScheduler.shutdown()
-
+      // shutdown partition state machine
       partitionStateMachine.shutdown()
+      // shutdown replica state machine
       replicaStateMachine.shutdown()
+      // shutdown controller channel manager
       if(controllerContext.controllerChannelManager != null) {
         controllerContext.controllerChannelManager.shutdown()
         controllerContext.controllerChannelManager = null
       }
+      // reset controller context
       controllerContext.epoch=0
       controllerContext.epochZkVersion=0
       brokerState.newState(RunningAsBroker)
@@ -870,11 +885,27 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
   }
 
   private def registerReassignedPartitionsListener() = {
-    zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this))
+    zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
+  }
+
+  private def deregisterReassignedPartitionsListener() = {
+    zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
   }
 
   private def registerPreferredReplicaElectionListener() {
-    zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this))
+    zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+  }
+
+  private def deregisterPreferredReplicaElectionListener() {
+    zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+  }
+
+  private def deregisterReassignedPartitionsIsrChangeListeners() {
+    controllerContext.partitionsBeingReassigned.foreach {
+      case (topicAndPartition, reassignedPartitionsContext) =>
+        val zkPartitionPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition)
+        zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener)
+    }
   }
 
   private def readControllerEpochFromZookeeper() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9894fb6/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 34c70b6..e20b63a 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -45,15 +45,16 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private val controllerContext = controller.controllerContext
   private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
-  var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
+  private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
   private val hasStarted = new AtomicBoolean(false)
   private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
-  this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
+  private val topicChangeListener = new TopicChangeListener()
+  private val deleteTopicsListener = new DeleteTopicsListener()
+  private val addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty
   private val stateChangeLogger = KafkaController.stateChangeLogger
-  private var topicChangeListener: TopicChangeListener = null
-  private var deleteTopicsListener: DeleteTopicsListener = null
-  private var addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty
+
+  this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
 
   /**
    * Invoked on successful controller election. First registers a topic change listener since that triggers all
@@ -63,9 +64,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   def startup() {
     // initialize partition state
     initializePartitionState()
+    // set started flag
     hasStarted.set(true)
     // try to move partitions to online state
     triggerOnlinePartitionStateChange()
+
     info("Started partition state machine with initial state -> " + partitionState.toString())
   }
 
@@ -76,12 +79,30 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       registerDeleteTopicListener()
   }
 
+  // de-register topic and partition change listeners
+  def deregisterListeners() {
+    deregisterTopicChangeListener()
+    addPartitionsListener.foreach {
+      case (topic, listener) =>
+        zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), listener)
+    }
+    addPartitionsListener.clear()
+    if(controller.config.deleteTopicEnable)
+      deregisterDeleteTopicListener()
+  }
+
   /**
    * Invoked on controller shutdown.
    */
   def shutdown() {
+    // reset started flag
     hasStarted.set(false)
+    // clear partition state
     partitionState.clear()
+    // de-register all ZK listeners
+    deregisterListeners()
+
+    info("Stopped partition state machine")
   }
 
   /**
@@ -362,10 +383,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 
   private def registerTopicChangeListener() = {
-    topicChangeListener = new TopicChangeListener()
     zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener)
   }
 
+  private def deregisterTopicChangeListener() = {
+    zkClient.unsubscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener)
+  }
+
   def registerPartitionChangeListener(topic: String) = {
     addPartitionsListener.put(topic, new AddPartitionsListener(topic))
     zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic))
@@ -373,13 +397,17 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
 
   def deregisterPartitionChangeListener(topic: String) = {
     zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic))
+    addPartitionsListener.remove(topic)
   }
 
   private def registerDeleteTopicListener() = {
-    deleteTopicsListener = new DeleteTopicsListener()
     zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener)
   }
 
+  private def deregisterDeleteTopicListener() = {
+    zkClient.unsubscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener)
+  }
+
   private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
     val topicAndPartition = TopicAndPartition(topic, partition)
     ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9894fb6/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index ad9c7c4..3e87e1d 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -48,12 +48,15 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   private val controllerContext = controller.controllerContext
   private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
-  var replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
+  private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
+  private val brokerChangeListener = new BrokerChangeListener()
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
   private val hasStarted = new AtomicBoolean(false)
-  this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
   private val stateChangeLogger = KafkaController.stateChangeLogger
 
+  this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
+
+
   /**
    * Invoked on successful controller election. First registers a broker change listener since that triggers all
    * state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.
@@ -62,23 +65,38 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   def startup() {
     // initialize replica state
     initializeReplicaState()
+    // set started flag
     hasStarted.set(true)
     // move all Online replicas to Online
     handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
+
     info("Started replica state machine with initial state -> " + replicaState.toString())
   }
 
-  // register broker change listener
+  // register ZK listeners of the replica state machine
   def registerListeners() {
+    // register broker change listener
     registerBrokerChangeListener()
   }
 
+  // de-register ZK listeners of the replica state machine
+  def deregisterListeners() {
+    // de-register broker change listener
+    deregisterBrokerChangeListener()
+  }
+
   /**
    * Invoked on controller shutdown.
    */
   def shutdown() {
+    // reset started flag
     hasStarted.set(false)
+    // reset replica state
     replicaState.clear()
+    // de-register all ZK listeners
+    deregisterListeners()
+
+    info("Stopped replica state machine")
   }
 
   /**
@@ -295,7 +313,11 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   }
 
   private def registerBrokerChangeListener() = {
-    zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener())
+    zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
+  }
+
+  private def deregisterBrokerChangeListener() = {
+    zkClient.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
   }
 
   /**