You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/08/08 22:50:23 UTC
git commit: kafka-1578;
Controller should de-register all listeners upon designation;
patched by Guozhang Wang; reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 0dc243b92 -> c9894fb6e
kafka-1578; Controller should de-register all listeners upon designation; patched by Guozhang Wang; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9894fb6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9894fb6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9894fb6
Branch: refs/heads/trunk
Commit: c9894fb6ee4d33155bdb669a2e2f64ed9c3f9441
Parents: 0dc243b
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Fri Aug 8 13:50:08 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Aug 8 13:50:08 2014 -0700
----------------------------------------------------------------------
.../kafka/controller/KafkaController.scala | 37 ++++++++++++++--
.../controller/PartitionStateMachine.scala | 44 ++++++++++++++++----
.../kafka/controller/ReplicaStateMachine.scala | 32 +++++++++++---
3 files changed, 97 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9894fb6/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a7a21df..8ab4a1b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -169,6 +169,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
+
+ private val partitionReassignedListener = new PartitionsReassignedListener(this)
+ private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
+
newGauge(
"ActiveControllerCount",
new Gauge[Int] {
@@ -333,19 +337,30 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
* required to clean up internal controller data structures
*/
def onControllerResignation() {
+ // de-register listeners
+ deregisterReassignedPartitionsListener()
+ deregisterPreferredReplicaElectionListener()
+
+ // shutdown delete topic manager
if (deleteTopicManager != null)
deleteTopicManager.shutdown()
inLock(controllerContext.controllerLock) {
+ // de-register partition ISR listener for on-going partition reassignment task
+ deregisterReassignedPartitionsIsrChangeListeners()
+ // shutdown leader rebalance scheduler
if (config.autoLeaderRebalanceEnable)
autoRebalanceScheduler.shutdown()
-
+ // shutdown partition state machine
partitionStateMachine.shutdown()
+ // shutdown replica state machine
replicaStateMachine.shutdown()
+ // shutdown controller channel manager
if(controllerContext.controllerChannelManager != null) {
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null
}
+ // reset controller context
controllerContext.epoch=0
controllerContext.epochZkVersion=0
brokerState.newState(RunningAsBroker)
@@ -870,11 +885,27 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
}
private def registerReassignedPartitionsListener() = {
- zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this))
+ zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
+ }
+
+ private def deregisterReassignedPartitionsListener() = {
+ zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
}
private def registerPreferredReplicaElectionListener() {
- zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, new PreferredReplicaElectionListener(this))
+ zkClient.subscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+ }
+
+ private def deregisterPreferredReplicaElectionListener() {
+ zkClient.unsubscribeDataChanges(ZkUtils.PreferredReplicaLeaderElectionPath, preferredReplicaElectionListener)
+ }
+
+ private def deregisterReassignedPartitionsIsrChangeListeners() {
+ controllerContext.partitionsBeingReassigned.foreach {
+ case (topicAndPartition, reassignedPartitionsContext) =>
+ val zkPartitionPath = ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition)
+ zkClient.unsubscribeDataChanges(zkPartitionPath, reassignedPartitionsContext.isrChangeListener)
+ }
}
private def readControllerEpochFromZookeeper() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9894fb6/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 34c70b6..e20b63a 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -45,15 +45,16 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
private val controllerContext = controller.controllerContext
private val controllerId = controller.config.brokerId
private val zkClient = controllerContext.zkClient
- var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
- val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
+ private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
+ private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
private val hasStarted = new AtomicBoolean(false)
private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
- this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
+ private val topicChangeListener = new TopicChangeListener()
+ private val deleteTopicsListener = new DeleteTopicsListener()
+ private val addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty
private val stateChangeLogger = KafkaController.stateChangeLogger
- private var topicChangeListener: TopicChangeListener = null
- private var deleteTopicsListener: DeleteTopicsListener = null
- private var addPartitionsListener: mutable.Map[String, AddPartitionsListener] = mutable.Map.empty
+
+ this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
/**
* Invoked on successful controller election. First registers a topic change listener since that triggers all
@@ -63,9 +64,11 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
def startup() {
// initialize partition state
initializePartitionState()
+ // set started flag
hasStarted.set(true)
// try to move partitions to online state
triggerOnlinePartitionStateChange()
+
info("Started partition state machine with initial state -> " + partitionState.toString())
}
@@ -76,12 +79,30 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
registerDeleteTopicListener()
}
+ // de-register topic and partition change listeners
+ def deregisterListeners() {
+ deregisterTopicChangeListener()
+ addPartitionsListener.foreach {
+ case (topic, listener) =>
+ zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), listener)
+ }
+ addPartitionsListener.clear()
+ if(controller.config.deleteTopicEnable)
+ deregisterDeleteTopicListener()
+ }
+
/**
* Invoked on controller shutdown.
*/
def shutdown() {
+ // reset started flag
hasStarted.set(false)
+ // clear partition state
partitionState.clear()
+ // de-register all ZK listeners
+ deregisterListeners()
+
+ info("Stopped partition state machine")
}
/**
@@ -362,10 +383,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
private def registerTopicChangeListener() = {
- topicChangeListener = new TopicChangeListener()
zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener)
}
+ private def deregisterTopicChangeListener() = {
+ zkClient.unsubscribeChildChanges(ZkUtils.BrokerTopicsPath, topicChangeListener)
+ }
+
def registerPartitionChangeListener(topic: String) = {
addPartitionsListener.put(topic, new AddPartitionsListener(topic))
zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic))
@@ -373,13 +397,17 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
def deregisterPartitionChangeListener(topic: String) = {
zkClient.unsubscribeDataChanges(ZkUtils.getTopicPath(topic), addPartitionsListener(topic))
+ addPartitionsListener.remove(topic)
}
private def registerDeleteTopicListener() = {
- deleteTopicsListener = new DeleteTopicsListener()
zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener)
}
+ private def deregisterDeleteTopicListener() = {
+ zkClient.unsubscribeChildChanges(ZkUtils.DeleteTopicsPath, deleteTopicsListener)
+ }
+
private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
val topicAndPartition = TopicAndPartition(topic, partition)
ReplicationUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9894fb6/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index ad9c7c4..3e87e1d 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -48,12 +48,15 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
private val controllerContext = controller.controllerContext
private val controllerId = controller.config.brokerId
private val zkClient = controllerContext.zkClient
- var replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
- val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
+ private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
+ private val brokerChangeListener = new BrokerChangeListener()
+ private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
private val hasStarted = new AtomicBoolean(false)
- this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
private val stateChangeLogger = KafkaController.stateChangeLogger
+ this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
+
+
/**
* Invoked on successful controller election. First registers a broker change listener since that triggers all
* state transitions for replicas. Initializes the state of replicas for all partitions by reading from zookeeper.
@@ -62,23 +65,38 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
def startup() {
// initialize replica state
initializeReplicaState()
+ // set started flag
hasStarted.set(true)
// move all Online replicas to Online
handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
+
info("Started replica state machine with initial state -> " + replicaState.toString())
}
- // register broker change listener
+ // register ZK listeners of the replica state machine
def registerListeners() {
+ // register broker change listener
registerBrokerChangeListener()
}
+ // de-register ZK listeners of the replica state machine
+ def deregisterListeners() {
+ // de-register broker change listener
+ deregisterBrokerChangeListener()
+ }
+
/**
* Invoked on controller shutdown.
*/
def shutdown() {
+ // reset started flag
hasStarted.set(false)
+ // reset replica state
replicaState.clear()
+ // de-register all ZK listeners
+ deregisterListeners()
+
+ info("Stopped replica state machine")
}
/**
@@ -295,7 +313,11 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
}
private def registerBrokerChangeListener() = {
- zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener())
+ zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
+ }
+
+ private def deregisterBrokerChangeListener() = {
+ zkClient.unsubscribeChildChanges(ZkUtils.BrokerIdsPath, brokerChangeListener)
}
/**