You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/25 17:32:11 UTC

git commit: kafka-813; Minor cleanup in Controller; patched by Swapnil Ghike; reviewed by Neha Narkhede and Jun Rao

Updated Branches:
  refs/heads/0.8 51421fcc0 -> 28ee78553


kafka-813; Minor cleanup in Controller; patched by Swapnil Ghike; reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/28ee7855
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/28ee7855
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/28ee7855

Branch: refs/heads/0.8
Commit: 28ee7855360b8e6ea690fc802dac4eaa60ad81e2
Parents: 51421fc
Author: Swapnil Ghike <sg...@linkedin.com>
Authored: Mon Mar 25 09:31:57 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Mar 25 09:31:57 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/api/LeaderAndIsrRequest.scala |    7 +-
 .../kafka/common/PartitionOfflineException.scala   |   28 ---------
 .../controller/ControllerChannelManager.scala      |    6 +-
 .../scala/kafka/controller/KafkaController.scala   |   47 +++++++++-----
 .../kafka/controller/PartitionLeaderSelector.scala |   44 +++++++++-----
 .../kafka/controller/PartitionStateMachine.scala   |   28 ++++-----
 .../kafka/controller/ReplicaStateMachine.scala     |    6 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |    2 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala    |    6 +-
 .../api/RequestResponseSerializationTest.scala     |    2 +-
 .../unit/kafka/server/LeaderElectionTest.scala     |    3 +-
 11 files changed, 87 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index b40522d..3b7ee24 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -83,7 +83,6 @@ case class PartitionStateInfo(val leaderIsrAndControllerEpoch: LeaderIsrAndContr
 
 object LeaderAndIsrRequest {
   val CurrentVersion = 0.shortValue
-  val DefaultClientId = ""
   val IsInit: Boolean = true
   val NotInit: Boolean = false
   val DefaultAckTimeout: Int = 1000
@@ -126,9 +125,9 @@ case class LeaderAndIsrRequest (versionId: Short,
     extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) {
 
   def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], liveBrokers: Set[Broker], controllerId: Int,
-           controllerEpoch: Int, correlationId: Int) = {
-    this(LeaderAndIsrRequest.CurrentVersion, correlationId, LeaderAndIsrRequest.DefaultClientId, LeaderAndIsrRequest.DefaultAckTimeout,
-      controllerId, controllerEpoch, partitionStateInfos, liveBrokers)
+           controllerEpoch: Int, correlationId: Int, clientId: String) = {
+    this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId, LeaderAndIsrRequest.DefaultAckTimeout,
+         controllerId, controllerEpoch, partitionStateInfos, liveBrokers)
   }
 
   def writeTo(buffer: ByteBuffer) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/common/PartitionOfflineException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/PartitionOfflineException.scala b/core/src/main/scala/kafka/common/PartitionOfflineException.scala
deleted file mode 100644
index 3367708..0000000
--- a/core/src/main/scala/kafka/common/PartitionOfflineException.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-
-/**
- * This exception is thrown by the leader elector in the controller when leader election fails for a partition since
- * all the replicas for a partition are offline
- */
-class PartitionOfflineException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
-  def this(message: String) = this(message, null)
-  def this() = this(null, null)
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 6e563d2..f7a7bd4 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -144,7 +144,8 @@ class RequestSendThread(val controllerId: Int,
   }
 }
 
-class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit, controllerId: Int)
+class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit,
+                                   controllerId: Int, clientId: String)
   extends  Logging {
   val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
   val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
@@ -190,7 +191,8 @@ class ControllerBrokerRequestBatch(sendRequest: (Int, RequestOrResponse, (Reques
       val partitionStateInfos = m._2.toMap
       val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet
       val leaders = liveBrokers.filter(b => leaderIds.contains(b.id))
-      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId)
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId,
+                                                        clientId)
       for (p <- partitionStateInfos) {
         val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower"
         stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request with correlationId %d to broker %d " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 25a8cfe..6e07096 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -46,7 +46,7 @@ class ControllerContext(val zkClient: ZkClient,
                         val correlationId: AtomicInteger = new AtomicInteger(0),
                         var allTopics: Set[String] = Set.empty,
                         var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty,
-                        var allLeaders: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty,
+                        var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty,
                         var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
                           new mutable.HashMap,
                         var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
@@ -87,10 +87,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   private val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
     config.brokerId)
+  val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, config.brokerId)
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(sendRequest, this.config.brokerId, this.clientId)
   registerControllerChangedListener()
 
   newGauge(
@@ -100,8 +101,21 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     }
   )
 
+  newGauge(
+    "OfflinePartitionsCount",
+    new Gauge[Int] {
+      def getValue: Int = {
+        controllerContext.controllerLock synchronized {
+          controllerContext.partitionLeadershipInfo.count(p => !controllerContext.liveBrokerIds.contains(p._2.leaderAndIsr.leader))
+        }
+      }
+    }
+  )
+
   def epoch = controllerContext.epoch
 
+  def clientId = "id_%d-host_%s-port_%d".format(config.brokerId, config.hostName, config.port)
+
   /**
    * JMX operation to initiate clean shutdown of a broker. On clean shutdown,
    * the controller first determines the partitions that the shutting down
@@ -137,8 +151,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       }
 
       def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized {
-        trace("All leaders = " + controllerContext.allLeaders.mkString(","))
-        controllerContext.allLeaders.filter {
+        trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
+        controllerContext.partitionLeadershipInfo.filter {
           case (topicAndPartition, leaderIsrAndControllerEpoch) =>
             leaderIsrAndControllerEpoch.leaderAndIsr.leader == id && controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
         }.map(_._1)
@@ -151,11 +165,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         val (topic, partition) = topicAndPartition.asTuple
         // move leadership serially to relinquish lock.
         controllerContext.controllerLock synchronized {
-          controllerContext.allLeaders.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
+          controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
             if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
               partitionStateMachine.handleStateChanges(Set(topicAndPartition), OnlinePartition,
-                controlledShutdownPartitionLeaderSelector)
-              val newLeaderIsrAndControllerEpoch = controllerContext.allLeaders(topicAndPartition)
+                                                       controlledShutdownPartitionLeaderSelector)
+              val newLeaderIsrAndControllerEpoch = controllerContext.partitionLeadershipInfo(topicAndPartition)
 
               // mark replica offline only if leadership was moved successfully
               if (newLeaderIsrAndControllerEpoch.leaderAndIsr.leader != currLeaderIsrAndControllerEpoch.leaderAndIsr.leader)
@@ -180,7 +194,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         allPartitionsAndReplicationFactorOnBroker foreach {
           case(topicAndPartition, replicationFactor) =>
             val (topic, partition) = topicAndPartition.asTuple
-            if (controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader != id) {
+            if (controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader != id) {
               brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topic, partition, deletePartition = false)
               removeReplicaFromIsr(topic, partition, id) match {
                 case Some(updatedLeaderIsrAndControllerEpoch) =>
@@ -289,7 +303,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
 
     val deadBrokersSet = deadBrokers.toSet
     // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
-    val partitionsWithoutLeader = controllerContext.allLeaders.filter(partitionAndLeader =>
+    val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
       deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader)).keySet
     partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
@@ -321,7 +335,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     info("New partition creation callback for %s".format(newPartitions.mkString(",")))
     partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
     replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), NewReplica)
-    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition)
+    partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
     replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), OnlineReplica)
   }
 
@@ -450,7 +464,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
     controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
     controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
-    controllerContext.allLeaders = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
+    controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
     // update the leader and isr cache for all existing partitions from Zookeeper
     updateLeaderAndIsrCache()
     // start the channel manager
@@ -482,7 +496,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
     // check if they are already completed
     val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition =>
-      controllerContext.allLeaders(partition).leaderAndIsr.leader == controllerContext.partitionReplicaAssignment(partition).head)
+      controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == controllerContext.partitionReplicaAssignment(partition).head)
     controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
     controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
     info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
@@ -502,7 +516,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it
       controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
         case true =>
-          controllerContext.allLeaders.put(topicPartition, leaderIsrAndControllerEpoch)
+          controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
         case false =>
           debug("While refreshing controller's leader and isr cache, leader %d for ".format(leaderIsrAndControllerEpoch.leaderAndIsr.leader) +
             "partition %s is dead, just ignore it".format(topicPartition))
@@ -522,7 +536,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   private def moveReassignedPartitionLeaderIfRequired(topicAndPartition: TopicAndPartition,
                                                       reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
-    val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
+    val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
     if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
       info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
         "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
@@ -626,7 +640,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   def removePartitionsFromPreferredReplicaElection(partitionsToBeRemoved: Set[TopicAndPartition]) {
     for(partition <- partitionsToBeRemoved) {
       // check the status
-      val currentLeader = controllerContext.allLeaders(partition).leaderAndIsr.leader
+      val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
       val preferredReplica = controllerContext.partitionReplicaAssignment(partition).head
       if(currentLeader == preferredReplica) {
         info("Partition %s completed preferred replica leader election. New leader is %d".format(partition, preferredReplica))
@@ -965,7 +979,6 @@ case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
 case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int)
 
 object ControllerStats extends KafkaMetricsGroup {
-  val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
-  val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)
+  val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
   val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 3ed9b7e..d295781 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -18,14 +18,14 @@ package kafka.controller
 
 import kafka.api.LeaderAndIsr
 import kafka.utils.Logging
-import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException}
+import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
 
 trait PartitionLeaderSelector {
 
   /**
    * @param topicAndPartition          The topic and partition whose leader needs to be elected
    * @param currentLeaderAndIsr        The current leader and isr of input partition read from zookeeper
-   * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
+   * @throws NoReplicaOnlineException If no replica in the assigned replicas list is alive
    * @return The leader and isr request, with the newly selected leader info, to send to the brokers
    * Also, returns the list of replicas the returned leader and isr request should be sent to
    * This API selects a new leader for the input partition
@@ -38,7 +38,7 @@ trait PartitionLeaderSelector {
  * This API selects a new leader for the input partition -
  * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader
  * 2. Else, it picks some alive broker from the assigned replica list as the new leader
- * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException
+ * 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException
  * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
  */
 class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
@@ -57,8 +57,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
               .format(liveAssignedReplicasToThisPartition.mkString(",")))
             liveAssignedReplicasToThisPartition.isEmpty match {
               case true =>
-                ControllerStats.offlinePartitionRate.mark()
-                throw new PartitionOfflineException(("No replica for partition " +
+                throw new NoReplicaOnlineException(("No replica for partition " +
                   "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
                   " Assigned replicas are: [%s]".format(assignedReplicas))
               case false =>
@@ -76,30 +75,31 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
         info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
         (newLeaderAndIsr, liveAssignedReplicasToThisPartition)
       case None =>
-        ControllerStats.offlinePartitionRate.mark()
-        throw new PartitionOfflineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it")
+        throw new NoReplicaOnlineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it")
     }
   }
 }
 
 /**
- * Picks one of the alive in-sync reassigned replicas as the new leader
+ * Picks one of the alive in-sync reassigned replicas as the new leader.
  */
 class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
   this.logIdent = "[ReassignedPartitionLeaderSelector]: "
 
+  /**
+   * The reassigned replicas are already in the ISR when selectLeader is called.
+   */
   def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
-    val reassignedReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
+    val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
     val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
     val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
-    // pick any replica from the newly assigned replicas list that is in the ISR
-    val aliveReassignedReplicas = reassignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
-    val newLeaderOpt = aliveReassignedReplicas.headOption
+    val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+    val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
     newLeaderOpt match {
       case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
-        currentLeaderIsrZkPathVersion + 1), reassignedReplicas)
+        currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
       case None =>
-        reassignedReplicas.size match {
+        reassignedInSyncReplicas.size match {
           case 0 =>
             throw new StateChangeFailedException("List of reassigned replicas for partition " +
               " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
@@ -124,7 +124,7 @@ with Logging {
     val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
     val preferredReplica = assignedReplicas.head
     // check if preferred replica is the current leader
-    val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
+    val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
     if(currentLeader == preferredReplica) {
       throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition %s"
         .format(preferredReplica, topicAndPartition))
@@ -177,6 +177,18 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
           " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(",")))
     }
   }
-
 }
 
+/**
+ * Essentially does nothing. Returns the current leader and ISR, and the current
+ * set of replicas assigned to a given topic/partition.
+ */
+class NoOpLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
+
+  this.logIdent = "[NoOpLeaderSelector]: "
+
+  def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+    warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.")
+    (currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index b25e9f4..654fa2e 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -20,7 +20,7 @@ import collection._
 import collection.JavaConversions._
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.api.LeaderAndIsr
-import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOfflineException}
+import kafka.common.{TopicAndPartition, StateChangeFailedException, NoReplicaOnlineException}
 import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.IZkChildListener
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
@@ -43,9 +43,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
   var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId)
-  val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId)
   private val isShuttingDown = new AtomicBoolean(false)
+  private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
   this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
@@ -86,7 +86,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state
       for((topicAndPartition, partitionState) <- partitionState) {
         if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
-          handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, offlinePartitionSelector)
+          handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector)
       }
       brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
     } catch {
@@ -101,7 +101,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * @param targetState  The state that the partitions should be moved to
    */
   def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
-                         leaderSelector: PartitionLeaderSelector = offlinePartitionSelector) {
+                         leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector) {
     info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
     try {
       brokerRequestBatch.newBatch()
@@ -111,6 +111,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers)
     }catch {
       case e => error("Error while moving some partitions to %s state".format(targetState), e)
+      // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions
     }
   }
 
@@ -149,7 +150,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             case _ => // should never come here since illegal previous states are checked above
           }
           partitionState.put(topicAndPartition, OnlinePartition)
-          val leader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
+          val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
           stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to OnlinePartition with leader %d"
                                     .format(controllerId, controller.epoch, topicAndPartition, partitionState(topicAndPartition), leader))
            // post: partition has a leader
@@ -172,7 +173,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     } catch {
       case t: Throwable =>
         stateChangeLogger.error("Controller %d epoch %d initiated state change for partition %s from %s to %s failed"
-                                  .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)
+          .format(controllerId, controller.epoch, topicAndPartition, currState, targetState), t)
     }
   }
 
@@ -232,7 +233,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
     liveAssignedReplicas.size match {
       case 0 =>
-        ControllerStats.offlinePartitionRate.mark()
         val failMsg = ("encountered error during state change of partition %s from New to Online, assigned replicas are [%s], " +
                        "live brokers are [%s]. No assigned replica is alive.")
                          .format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)
@@ -253,14 +253,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           // GC pause
           brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
             topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
-          controllerContext.allLeaders.put(topicAndPartition, leaderIsrAndControllerEpoch)
-          partitionState.put(topicAndPartition, OnlinePartition)
+          controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
         } catch {
           case e: ZkNodeExistsException =>
             // read the controller epoch
             val leaderIsrAndEpoch = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topicAndPartition.topic,
               topicAndPartition.partition).get
-            ControllerStats.offlinePartitionRate.mark()
             val failMsg = ("encountered error while changing partition %s's state from New to Online since LeaderAndIsr path already " +
                            "exists with value %s and controller epoch %d")
                              .format(topicAndPartition, leaderIsrAndEpoch.leaderAndIsr.toString(), leaderIsrAndEpoch.controllerEpoch)
@@ -310,22 +308,20 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
       }
       val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
       // update the leader cache
-      controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
+      controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
       stateChangeLogger.trace("Controller %d epoch %d elected leader %d for Offline partition %s"
                                 .format(controllerId, controller.epoch, newLeaderAndIsr.leader, topicAndPartition))
       // store new leader and isr info in cache
       brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
         newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
     } catch {
-      case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas %s for partition %s are dead."
-        .format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(","), topicAndPartition) +
-        " Marking this partition offline", poe)
+      case nroe: NoReplicaOnlineException => throw nroe
       case sce =>
         val failMsg = "encountered error while electing leader for partition %s due to: %s.".format(topicAndPartition, sce.getMessage)
         stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
         throw new StateChangeFailedException(failMsg, sce)
     }
-    debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
+    debug("After leader election, leader cache is updated to %s".format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2))))
   }
 
   private def registerTopicChangeListener() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 88058ec..5146f12 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -42,7 +42,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
   private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
   var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controller.config.brokerId)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.sendRequest, controllerId, controller.clientId)
   private val isShuttingDown = new AtomicBoolean(false)
   this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
@@ -143,7 +143,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                                         .format(controllerId, controller.epoch, replicaId, topicAndPartition))
             case _ =>
               // check if the leader for this partition is alive or even exists
-                controllerContext.allLeaders.get(topicAndPartition) match {
+                controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
                 case Some(leaderIsrAndControllerEpoch) =>
                   controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
                     case true => // leader is alive
@@ -163,7 +163,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
           // As an optimization, the controller removes dead replicas from the ISR
           val leaderAndIsrIsEmpty: Boolean =
-            controllerContext.allLeaders.get(topicAndPartition) match {
+            controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
               case Some(currLeaderIsrAndControllerEpoch) =>
                 if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId))
                   controller.removeReplicaFromIsr(topic, partition, replicaId) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index e7248c3..7298ccb 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -98,8 +98,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
    */
   private def registerStats() {
     BrokerTopicStats.getBrokerAllTopicsStats()
-    ControllerStats.offlinePartitionRate
     ControllerStats.uncleanLeaderElectionRate
+    ControllerStats.leaderElectionTimer
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 4dd1ba7..6c80c4c 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -381,7 +381,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       var leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
       assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
       // assertEquals(2, topicMetadata.partitionsMetadata.head.isr.size)
-      assertEquals(2, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+      assertEquals(2, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
 
       leaderBeforeShutdown = leaderAfterShutdown
       controllerId = ZkUtils.getController(zkClient)
@@ -392,7 +392,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
       assertTrue(leaderAfterShutdown != leaderBeforeShutdown)
       // assertEquals(1, topicMetadata.partitionsMetadata.head.isr.size)
-      assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+      assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
 
       leaderBeforeShutdown = leaderAfterShutdown
       controllerId = ZkUtils.getController(zkClient)
@@ -402,7 +402,7 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topic, zkClient)
       leaderAfterShutdown = topicMetadata.partitionsMetadata.head.leader.get.id
       assertTrue(leaderAfterShutdown == leaderBeforeShutdown)
-      assertEquals(1, controller.controllerContext.allLeaders(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
+      assertEquals(1, controller.controllerContext.partitionLeadershipInfo(TopicAndPartition("test", 1)).leaderAndIsr.isr.size)
     }
     finally {
       servers.foreach(_.shutdown())

http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 02ff81f..0f15718 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -89,7 +89,7 @@ object SerializationTestUtils{
     val leaderAndIsr2 = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader2, 1, isr2, 2), 1)
     val map = Map(((topic1, 0), PartitionStateInfo(leaderAndIsr1, 3)),
                   ((topic2, 0), PartitionStateInfo(leaderAndIsr2, 3)))
-    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0)
+    new LeaderAndIsrRequest(map.toMap, collection.immutable.Set[Broker](), 0, 1, 0, "")
   }
 
   def createTestLeaderAndIsrResponse() : LeaderAndIsrResponse = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/28ee7855/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index ec1db2d..8f88177 100644
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -133,7 +133,8 @@ class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
     leaderAndIsr.put((topic, partitionId),
       new LeaderIsrAndControllerEpoch(new LeaderAndIsr(brokerId2, List(brokerId1, brokerId2)), 2))
     val partitionStateInfo = leaderAndIsr.mapValues(l => new PartitionStateInfo(l, 1)).toMap
-    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId, staleControllerEpoch, 0)
+    val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfo, brokers.toSet, controllerId,
+                                                      staleControllerEpoch, 0, "")
 
     controllerChannelManager.sendRequest(brokerId2, leaderAndIsrRequest, staleControllerEpochCallback)
     TestUtils.waitUntilTrue(() => staleControllerEpochDetected == true, 1000)