You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/01/15 17:35:11 UTC

git commit: kafka-1202; optimize ZK access in KafkaController; also incorporating fixes in kafka-1020; patched by Jun Rao and Guozhang Wang; reviewed by Neha Narkhede and Joel Koshy

Updated Branches:
  refs/heads/trunk d401292ab -> 855340a2e


kafka-1202; optimize ZK access in KafkaController; also incorporating fixes in kafka-1020; patched by Jun Rao and Guozhang Wang; reviewed by Neha Narkhede and Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/855340a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/855340a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/855340a2

Branch: refs/heads/trunk
Commit: 855340a2e65ffbb79520c49d0b9a231b94acd538
Parents: d401292
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Jan 15 08:37:14 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jan 15 08:37:14 2014 -0800

----------------------------------------------------------------------
 .../controller/ControllerChannelManager.scala   |   4 -
 .../kafka/controller/KafkaController.scala      | 132 ++++++++++++-------
 .../controller/PartitionLeaderSelector.scala    |  40 +++---
 .../controller/PartitionStateMachine.scala      |  22 +++-
 .../kafka/controller/ReplicaStateMachine.scala  |  35 ++---
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  43 ------
 6 files changed, 146 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 33a84fb..ea8485b 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -203,10 +203,6 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
     if(stopAndDeleteReplicaRequestMap.size > 0)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
         "new one. Some StopReplica with delete state changes %s might be lost ".format(stopAndDeleteReplicaRequestMap.toString()))
-    leaderAndIsrRequestMap.clear()
-    stopReplicaRequestMap.clear()
-    updateMetadataRequestMap.clear()
-    stopAndDeleteReplicaRequestMap.clear()
   }
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 03ef9cf..a0267ae 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -69,6 +69,26 @@ class ControllerContext(val zkClient: ZkClient,
 
   def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying
   def liveOrShuttingDownBrokers = liveBrokersUnderlying
+
+  def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {
+    partitionReplicaAssignment
+      .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
+      .map { case(topicAndPartition, replicas) => topicAndPartition }
+      .toSet
+  }
+
+  def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
+    brokerIds.map { brokerId =>
+      partitionReplicaAssignment
+        .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
+        .map { case(topicAndPartition, replicas) =>
+                 new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId) }
+    }.flatten.toSet
+  }
+
+  def allLiveReplicas(): Set[PartitionAndReplica] = {
+    replicasOnBrokers(liveBrokerIds)
+  }
 }
 
 trait KafkaControllerMBean {
@@ -190,13 +210,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
       }
 
-      val allPartitionsAndReplicationFactorOnBroker = controllerContext.controllerLock synchronized {
-        getPartitionsAssignedToBroker(zkClient, controllerContext.allTopics.toSeq, id).map {
-          case(topic, partition) =>
-            val topicAndPartition = TopicAndPartition(topic, partition)
-            (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size)
+      val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
+        controllerContext.controllerLock synchronized {
+          controllerContext.partitionsOnBroker(id)
+            .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
         }
-      }
 
       allPartitionsAndReplicationFactorOnBroker.foreach {
         case(topicAndPartition, replicationFactor) =>
@@ -328,7 +346,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     sendUpdateMetadataRequest(newBrokers)
     // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
     // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
-    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), OnlineReplica)
+    replicaStateMachine.handleStateChanges(controllerContext.replicasOnBrokers(newBrokersSet), OnlineReplica)
     // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
     // to see if these brokers can become leaders for some/all of those
     partitionStateMachine.triggerOnlinePartitionStateChange()
@@ -366,12 +384,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // handle dead replicas
-    replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica)
+    replicaStateMachine.handleStateChanges(controllerContext.replicasOnBrokers(deadBrokersSet), OfflineReplica)
   }
 
   /**
-   * This callback is invoked by the partition state machine's topic change listener with the list of failed brokers
-   * as input. It does the following -
+   * This callback is invoked by the partition state machine's topic change listener with the list of new topics
+   * and partitions as input. It does the following -
    * 1. Registers partition change listener. This is not required until KAFKA-347
    * 2. Invokes the new partition callback
    */
@@ -383,7 +401,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   /**
-   * This callback is invoked by the topic change callback with the list of failed brokers as input.
+   * This callback is invoked by the partition state machine's partition change listener with the list of new partitions.
    * It does the following -
    * 1. Move the newly created partitions to the NewPartition state
    * 2. Move the newly created partitions from NewPartition->OnlinePartition state
@@ -399,60 +417,84 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   /**
    * This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition
    * reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener.
-   * Reassigning replicas for a partition goes through a few stages -
+   * Reassigning replicas for a partition goes through a few steps listed in the code.
    * RAR = Reassigned replicas
-   * AR = Original list of replicas for partition
-   * 1. Write new AR = AR + RAR. At this time, update the leader epoch in zookeeper and send a LeaderAndIsr request with
-   *    AR = AR + RAR to all replicas in (AR + RAR)
-   * 2. Start new replicas RAR - AR.
-   * 3. Wait until new replicas are in sync with the leader
-   * 4. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr
+   * OAR = Original list of replicas for partition
+   * AR = current assigned replicas
+   *
+   * 1. Update AR in ZK with OAR + RAR.
+   * 2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). We do this by forcing an update
+   *    of the leader epoch in zookeeper.
+   * 3. Start new replicas RAR - OAR by moving replicas in RAR - OAR to NewReplica state.
+   * 4. Wait until all replicas in RAR are in sync with the leader.
+   * 5  Move all replicas in RAR to OnlineReplica state.
+   * 6. Set AR to RAR in memory.
+   * 7. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr
    *    will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent.
    *    In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader from adding any replica in
-   *    RAR - AR back in the ISR
-   * 5. Stop old replicas AR - RAR. As part of this, we make 2 state changes OfflineReplica and NonExistentReplica. As part
-   *    of OfflineReplica state change, we shrink the ISR to remove RAR - AR in zookeeper and sent a LeaderAndIsr ONLY to
-   *    the Leader to notify it of the shrunk ISR. After that, we send a StopReplica (delete = false) to the replicas in
-   *    RAR - AR. As part of the NonExistentReplica state change, we delete replicas in RAR - AR.
-   * 6. Write new AR = RAR. As part of this, we finally change the AR in zookeeper to RAR.
-   * 7. Remove partition from the /admin/reassign_partitions path
+   *    RAR - OAR back in the isr.
+   * 8. Move all replicas in OAR - RAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the
+   *    isr to remove OAR - RAR in zookeeper and sent a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr.
+   *    After that, we send a StopReplica (delete = false) to the replicas in OAR - RAR.
+   * 9. Move all replicas in OAR - RAR to NonExistentReplica state. This will send a StopReplica (delete = false) to
+   *    the replicas in OAR - RAR to physically delete the replicas on disk.
+   * 10. Update AR in ZK with RAR.
+   * 11. Update the /admin/reassign_partitions path in ZK to remove this partition.
+   * 12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
+   *
+   * For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK
+   * may go through the following transition.
+   * AR                 leader/isr
+   * {1,2,3}            1/{1,2,3}           (initial state)
+   * {1,2,3,4,5,6}      1/{1,2,3}           (step 2)
+   * {1,2,3,4,5,6}      1/{1,2,3,4,5,6}     (step 4)
+   * {1,2,3,4,5,6}      4/{1,2,3,4,5,6}     (step 7)
+   * {1,2,3,4,5,6}      4/{4,5,6}           (step 8)
+   * {4,5,6}            4/{4,5,6}           (step 10)
+   *
+   * Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
+   * This way, if the controller crashes before that step, we can still recover.
    */
   def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
     areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match {
+      case false =>
+        info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
+          "reassigned not yet caught up with the leader")
+        val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
+        val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
+        //1. Update AR in ZK with OAR + RAR.
+        updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
+        //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
+        updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
+          newAndOldReplicas.toSeq)
+        //3. replicas in RAR - OAR -> NewReplica
+        startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
+        info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
+          "reassigned to catch up with the leader")
       case true =>
+        //4. Wait until all replicas in RAR are in sync with the leader.
         val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
-        // mark the new replicas as online
+        //5. replicas in RAR -> OnlineReplica
         reassignedReplicas.foreach { replica =>
           replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
             replica)), OnlineReplica)
         }
-        // check if current leader is in the new replicas list. If not, controller needs to trigger leader election
+        //6. Set AR to RAR in memory.
+        //7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and
+        //   a new AR (using RAR) and same isr to every broker in RAR
         moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
-        // stop older replicas
+        //8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
+        //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
         stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
-        // write the new list of replicas for this partition in zookeeper
+        //10. Update AR in ZK with RAR.
         updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
-        // update the /admin/reassign_partitions path to remove this partition
+        //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
         removePartitionFromReassignedPartitions(topicAndPartition)
         info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
         controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
-        // after electing leader, the replicas and isr information changes, so resend the update metadata request
+        //12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
         sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
-      case false =>
-        info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
-          "reassigned not yet caught up with the leader")
-        val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
-        val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
-        // write the expanded list of replicas to zookeeper
-        updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
-        // update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
-        updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
-          newAndOldReplicas.toSeq)
-        // start new replicas
-        startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
-        info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
-          "reassigned to catch up with the leader")
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index a47b142..fd9200f 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -26,19 +26,20 @@ trait PartitionLeaderSelector {
    * @param topicAndPartition          The topic and partition whose leader needs to be elected
    * @param currentLeaderAndIsr        The current leader and isr of input partition read from zookeeper
    * @throws NoReplicaOnlineException If no replica in the assigned replicas list is alive
-   * @return The leader and isr request, with the newly selected leader info, to send to the brokers
-   * Also, returns the list of replicas the returned leader and isr request should be sent to
-   * This API selects a new leader for the input partition
+   * @return The leader and isr request, with the newly selected leader and isr, and the set of replicas to receive
+   * the LeaderAndIsrRequest.
    */
   def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
 
 }
 
 /**
- * This API selects a new leader for the input partition -
- * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader
- * 2. Else, it picks some alive broker from the assigned replica list as the new leader
+ * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest):
+ * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live
+ *    isr as the new isr.
+ * 2. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr.
  * 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException
+ * Replicas to receive LeaderAndIsr request = live assigned replicas
  * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
  */
 class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
@@ -82,7 +83,9 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
 }
 
 /**
- * Picks one of the alive in-sync reassigned replicas as the new leader.
+ * New leader = a live in-sync reassigned replica
+ * New isr = current isr
+ * Replicas to receive LeaderAndIsr request = reassigned replicas
  */
 class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
   this.logIdent = "[ReassignedPartitionLeaderSelector]: "
@@ -94,7 +97,8 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
     val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
     val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
     val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
-    val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+    val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&
+                                                                             currentLeaderAndIsr.isr.contains(r))
     val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
     newLeaderOpt match {
       case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
@@ -106,16 +110,16 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
               " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
           case _ =>
             throw new StateChangeFailedException("None of the reassigned replicas for partition " +
-              "%s are alive. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
+              "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
         }
     }
   }
 }
 
 /**
- * Picks the preferred replica as the new leader if -
- * 1. It is already not the current leader
- * 2. It is alive
+ * New leader = preferred (first assigned) replica (if in isr and alive);
+ * New isr = current isr;
+ * Replicas to receive LeaderAndIsr request = assigned replicas
  */
 class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
 with Logging {
@@ -145,8 +149,9 @@ with Logging {
 }
 
 /**
- * Picks one of the alive replicas (other than the current leader) in ISR as
- * new leader, fails if there are no other replicas in ISR.
+ * New leader = replica in isr that's not being shutdown;
+ * New isr = current isr - shutdown replica;
+ * Replicas to receive LeaderAndIsr request = live assigned replicas
  */
 class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
         extends PartitionLeaderSelector
@@ -164,8 +169,7 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
     val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
     val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
 
-    val newIsr = currentLeaderAndIsr.isr.filter(brokerId => brokerId != currentLeader &&
-                                                            !controllerContext.shuttingDownBrokerIds.contains(brokerId))
+    val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
     val newLeaderOpt = newIsr.headOption
     newLeaderOpt match {
       case Some(newLeader) =>
@@ -174,8 +178,8 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
         (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
          liveAssignedReplicas)
       case None =>
-        throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides current leader %d and" +
-          " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(",")))
+        throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
+          " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 5859ce7..ac4262a 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -119,7 +119,23 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
 
   /**
    * This API exercises the partition's state machine. It ensures that every state transition happens from a legal
-   * previous state to the target state.
+   * previous state to the target state. Valid state transitions are:
+   * NonExistentPartition -> NewPartition:
+   * --load assigned replicas from ZK to controller cache
+   *
+   * NewPartition -> OnlinePartition
+   * --assign first live replica as the leader and all live replicas as the isr; write leader and isr to ZK for this partition
+   * --send LeaderAndIsr request to every live replica and UpdateMetadata request to every live broker
+   *
+   * OnlinePartition,OfflinePartition -> OnlinePartition
+   * --select new leader and isr for this partition and a set of replicas to receive the LeaderAndIsr request, and write leader and isr to ZK
+   * --for this partition, send LeaderAndIsr request to every receiving replica and UpdateMetadata request to every live broker
+   *
+   * NewPartition,OnlinePartition -> OfflinePartition
+   * --nothing other than marking partition state as Offline
+   *
+   * OfflinePartition -> NonExistentPartition
+   * --nothing other than marking the partition state as NonExistentPartition
    * @param topic       The topic of the partition for which the state transition is invoked
    * @param partition   The partition for which the state transition is invoked
    * @param targetState The end state that the partition should be moved to
@@ -273,8 +289,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   }
 
   /**
-   * Invoked on the OfflinePartition->OnlinePartition state change. It invokes the leader election API to elect a leader
-   * for the input offline partition
+   * Invoked on the OfflinePartition,OnlinePartition->OnlinePartition state change.
+   * It invokes the leader election API to elect a leader for the input offline partition
    * @param topic               The topic of the offline partition
    * @param partition           The offline partition
    * @param leaderSelector      Specific leader selector (e.g., offline/reassigned/etc.)

http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index ad4ee53..483559a 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -58,8 +58,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     initializeReplicaState()
     hasStarted.set(true)
     // move all Online replicas to Online
-    handleStateChanges(getAllReplicasOnBroker(controllerContext.allTopics.toSeq,
-      controllerContext.liveBrokerIds.toSeq), OnlineReplica)
+    handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
     info("Started replica state machine with initial state -> " + replicaState.toString())
   }
 
@@ -95,7 +94,23 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
 
   /**
    * This API exercises the replica's state machine. It ensures that every state transition happens from a legal
-   * previous state to the target state.
+   * previous state to the target state. Valid state transitions are:
+   * NonExistentReplica --> NewReplica
+   * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
+   *
+   * NewReplica -> OnlineReplica
+   * --add the new replica to the assigned replica list if needed
+   *
+   * OnlineReplica,OfflineReplica -> OnlineReplica
+   * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
+   *
+   * NewReplica,OnlineReplica -> OfflineReplica
+   * --send StopReplicaRequest to the replica (w/o deletion)
+   * --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and UpdateMetadata request for the partition to every live broker.
+   *
+   * OfflineReplica -> NonExistentReplica
+   * --send StopReplicaRequest to the replica (with deletion)
+   *
    * @param topic       The topic of the replica for which the state transition is invoked
    * @param partition   The partition of the replica for which the state transition is invoked
    * @param replicaId   The replica for which the state transition is invoked
@@ -228,20 +243,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     }
   }
 
-  private def getAllReplicasOnBroker(topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = {
-    brokerIds.map { brokerId =>
-      val partitionsAssignedToThisBroker =
-        controllerContext.partitionReplicaAssignment.filter(p => topics.contains(p._1.topic) && p._2.contains(brokerId))
-      if(partitionsAssignedToThisBroker.size == 0)
-        info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
-      partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1.topic, p._1.partition, brokerId))
-    }.flatten.toSet
-  }
-
-  def getPartitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = {
-    controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq
-  }
-
   /**
    * This is the zookeeper listener that triggers all the state transitions for a replica
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 73902b2..b42e52b 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -185,12 +185,6 @@ object ZkUtils extends Logging {
     }
   }
 
-  def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
-    val replicas = getReplicasForPartition(zkClient, topic, partition)
-    debug("The list of replicas for partition [%s,%d] is %s".format(topic, partition, replicas))
-    replicas.contains(brokerId.toString)
-  }
-
   def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val timestamp = SystemTime.milliseconds.toString
@@ -500,8 +494,6 @@ object ZkUtils extends Logging {
     client.exists(path)
   }
 
-  def getLastPart(path : String) : String = path.substring(path.lastIndexOf('/') + 1)
-
   def getCluster(zkClient: ZkClient) : Cluster = {
     val cluster = new Cluster
     val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
@@ -571,17 +563,6 @@ object ZkUtils extends Logging {
     ret
   }
 
-  def getReplicaAssignmentFromPartitionAssignment(topicPartitionAssignment: mutable.Map[String, collection.Map[Int, Seq[Int]]]):
-  mutable.Map[(String, Int), Seq[Int]] = {
-    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
-    for((topic, partitionAssignment) <- topicPartitionAssignment){
-      for((partition, replicaAssignment) <- partitionAssignment){
-        ret.put((topic, partition), replicaAssignment)
-      }
-    }
-    ret
-  }
-
   def getPartitionsForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
     getPartitionAssignmentForTopics(zkClient, topics).map { topicAndPartitionMap =>
       val topic = topicAndPartitionMap._1
@@ -591,19 +572,6 @@ object ZkUtils extends Logging {
     }
   }
 
-  def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Seq[(String, Int)] = {
-    val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics)
-    topicsAndPartitions.map { topicAndPartitionMap =>
-      val topic = topicAndPartitionMap._1
-      val partitionMap = topicAndPartitionMap._2
-      val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) )
-      val relevantPartitions = relevantPartitionsMap.map(_._1)
-      for(relevantPartition <- relevantPartitions) yield {
-        (topic, relevantPartition)
-      }
-    }.flatten[(String, Int)].toSeq
-  }
-
   def getPartitionsBeingReassigned(zkClient: ZkClient): Map[TopicAndPartition, ReassignedPartitionsContext] = {
     // read the partitions and their new replica list
     val jsonPartitionMapOpt = readDataMaybeNull(zkClient, ReassignPartitionsPath)._1
@@ -677,17 +645,6 @@ object ZkUtils extends Logging {
     }
   }
 
-  def getAllReplicasOnBroker(zkClient: ZkClient, topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = {
-    Set.empty[PartitionAndReplica] ++ brokerIds.map { brokerId =>
-      // read all the partitions and their assigned replicas into a map organized by
-      // { replica id -> partition 1, partition 2...
-      val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(zkClient, topics, brokerId)
-      if(partitionsAssignedToThisBroker.size == 0)
-        info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
-      partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1, p._2, brokerId))
-    }.flatten
-  }
-  
   def getPartitionsUndergoingPreferredReplicaElection(zkClient: ZkClient): Set[TopicAndPartition] = {
     // read the partitions and their new replica list
     val jsonPartitionListOpt = readDataMaybeNull(zkClient, PreferredReplicaLeaderElectionPath)._1