You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/06/03 23:06:46 UTC

git commit: KAFKA-928 new topics may not be processed after ZK session expiration in controller; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 4850519a2 -> 658427638


KAFKA-928 new topics may not be processed after ZK session expiration in controller; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65842763
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65842763
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65842763

Branch: refs/heads/0.8
Commit: 658427638f1b00ae1d284e579b4631f7119e54cc
Parents: 4850519
Author: Neha Narkhede <ne...@apache.org>
Authored: Mon Jun 3 14:06:08 2013 -0700
Committer: Neha Narkhede <ne...@apache.org>
Committed: Mon Jun 3 14:06:08 2013 -0700

----------------------------------------------------------------------
 .../kafka/controller/PartitionStateMachine.scala   |    5 ++---
 .../kafka/controller/ReplicaStateMachine.scala     |    5 ++---
 2 files changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/65842763/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 0f5ebde..deebed0 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -46,7 +46,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.controllerContext, controller.sendRequest,
     controllerId, controller.clientId)
   private val hasStarted = new AtomicBoolean(false)
-  private val hasShutdown = new AtomicBoolean(false)
   private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
   this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
@@ -74,7 +73,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * Invoked on controller shutdown.
    */
   def shutdown() {
-    hasShutdown.compareAndSet(false, true)
+    hasStarted.set(false)
     partitionState.clear()
   }
 
@@ -358,7 +357,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, children : java.util.List[String]) {
       controllerContext.controllerLock synchronized {
-        if (!hasShutdown.get) {
+        if (hasStarted.get) {
           try {
             debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
             val currentChildren = JavaConversions.asBuffer(children).toSet

http://git-wip-us.apache.org/repos/asf/kafka/blob/65842763/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 5283fcd..4fbb28e 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -45,7 +45,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.controllerContext, controller.sendRequest,
     controllerId, controller.clientId)
   private val hasStarted = new AtomicBoolean(false)
-  private val hasShutdown = new AtomicBoolean(false)
   this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
@@ -73,7 +72,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
    * Invoked on controller shutdown.
    */
   def shutdown() {
-    hasShutdown.compareAndSet(false, true)
+    hasStarted.set(false)
     replicaState.clear()
   }
 
@@ -252,7 +251,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
       ControllerStats.leaderElectionTimer.time {
         info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
         controllerContext.controllerLock synchronized {
-          if (!hasShutdown.get) {
+          if (hasStarted.get) {
             try {
               val curBrokerIds = currentBrokerList.map(_.toInt).toSet
               val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds