You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/04/04 08:42:59 UTC

git commit: KAFKA-840 Post commit

Updated Branches:
  refs/heads/0.8 5a50f7e55 -> 3ead78b96


KAFKA-840 Post commit


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3ead78b9
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3ead78b9
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3ead78b9

Branch: refs/heads/0.8
Commit: 3ead78b966a8464efa6d53fbb81fcb0efc75ba63
Parents: 5a50f7e
Author: Neha Narkhede <ne...@gmail.com>
Authored: Wed Apr 3 23:40:05 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Apr 3 23:40:05 2013 -0700

----------------------------------------------------------------------
 .../kafka/controller/PartitionStateMachine.scala   |   13 +++++++------
 .../kafka/controller/ReplicaStateMachine.scala     |   13 +++++++------
 2 files changed, 14 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3ead78b9/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 156bb10..c017727 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -44,7 +44,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private val zkClient = controllerContext.zkClient
   var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId)
-  private val isRunning = new AtomicBoolean(false)
+  private val hasStarted = new AtomicBoolean(false)
+  private val hasShutdown = new AtomicBoolean(false)
   private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
   this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
@@ -57,7 +58,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   def startup() {
     // initialize partition state
     initializePartitionState()
-    isRunning.set(true)
+    hasStarted.set(true)
     // try to move partitions to online state
     triggerOnlinePartitionStateChange()
     info("Started partition state machine with initial state -> " + partitionState.toString())
@@ -72,7 +73,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * Invoked on controller shutdown.
    */
   def shutdown() {
-    isRunning.compareAndSet(true, false)
+    hasShutdown.compareAndSet(false, true)
     partitionState.clear()
   }
 
@@ -125,7 +126,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
                                 leaderSelector: PartitionLeaderSelector) {
     val topicAndPartition = TopicAndPartition(topic, partition)
-    if (!isRunning.get)
+    if (!hasStarted.get)
       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
                                             "the partition state machine has not started")
                                               .format(controllerId, controller.epoch, topicAndPartition, targetState))
@@ -356,8 +357,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, children : java.util.List[String]) {
-      if(isRunning.get) {
-        controllerContext.controllerLock synchronized {
+      controllerContext.controllerLock synchronized {
+        if (!hasShutdown.get) {
           try {
             debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
             val currentChildren = JavaConversions.asBuffer(children).toSet

http://git-wip-us.apache.org/repos/asf/kafka/blob/3ead78b9/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index ef2356f..bea1644 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -43,7 +43,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   private val zkClient = controllerContext.zkClient
   var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId)
-  private val isRunning = new AtomicBoolean(false)
+  private val hasStarted = new AtomicBoolean(false)
+  private val hasShutdown = new AtomicBoolean(false)
   this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
@@ -55,7 +56,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   def startup() {
     // initialize replica state
     initializeReplicaState()
-    isRunning.set(true)
+    hasStarted.set(true)
     // move all Online replicas to Online
     handleStateChanges(ZkUtils.getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq,
       controllerContext.liveBrokerIds.toSeq), OnlineReplica)
@@ -71,7 +72,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
    * Invoked on controller shutdown.
    */
   def shutdown() {
-    isRunning.compareAndSet(true, false)
+    hasShutdown.compareAndSet(false, true)
     replicaState.clear()
   }
 
@@ -102,7 +103,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
    */
   def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) {
     val topicAndPartition = TopicAndPartition(topic, partition)
-    if (!isRunning.get)
+    if (!hasStarted.get)
       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " +
                                             "to %s failed because replica state machine has not started")
                                               .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))
@@ -239,8 +240,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
       ControllerStats.leaderElectionTimer.time {
         info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
-        if(isRunning.get) {
-          controllerContext.controllerLock synchronized {
+        controllerContext.controllerLock synchronized {
+          if (!hasShutdown.get) {
             try {
               val curBrokerIds = currentBrokerList.map(_.toInt).toSet
               val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds