You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/11/01 18:28:18 UTC

git commit: KAFKA-1097 Race condition while reassigning low throughput partition leads to incorrect ISR information in zookeeper; reviewed by Jun Rao, Guozhang Wang

Updated Branches:
  refs/heads/trunk e602ed058 -> df7f7a255


KAFKA-1097 Race condition while reassigning low throughput partition leads to incorrect ISR information in zookeeper; reviewed by Jun Rao, Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/df7f7a25
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/df7f7a25
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/df7f7a25

Branch: refs/heads/trunk
Commit: df7f7a255bdd803cddfdf3fea2ace4f1ae366377
Parents: e602ed0
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Nov 1 10:27:36 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Nov 1 10:27:36 2013 -0700

----------------------------------------------------------------------
 .../kafka/admin/ReassignPartitionsCommand.scala |  14 +-
 .../main/scala/kafka/cluster/Partition.scala    |  49 +++++--
 .../common/NotAssignedReplicaException.scala    |  23 ++++
 .../kafka/controller/KafkaController.scala      | 127 +++++++++++++++----
 .../kafka/controller/ReplicaStateMachine.scala  |   2 +
 .../scala/kafka/server/ReplicaManager.scala     |  63 +++++----
 kafka-patch-review.py                           |   6 +-
 project/Build.scala                             |   2 +-
 8 files changed, 214 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 2f706c9..70d1b81 100644
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -104,10 +104,14 @@ object ReassignPartitionsCommand extends Logging {
           }
         }
       } else if(options.has(topicsToMoveJsonFileOpt)) {
+        if(!options.has(brokerListOpt)) {
+          System.err.println("broker-list is required if topics-to-move-json-file is used")
+          parser.printHelpOn(System.err)
+          System.exit(1)
+        }
         val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt)
-        val brokerList = options.valueOf(brokerListOpt)
+        val brokerListToReassign = options.valueOf(brokerListOpt).split(',').map(_.toInt)
         val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile)
-        val brokerListToReassign = brokerList.split(',') map (_.toInt)
         val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString)
         val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign)
 
@@ -117,7 +121,6 @@ object ReassignPartitionsCommand extends Logging {
             topicInfo._2.head._2.size)
           partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2))
         }
-
       } else if (options.has(manualAssignmentJsonFileOpt)) {
         val manualAssignmentJsonFile =  options.valueOf(manualAssignmentJsonFileOpt)
         val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile)
@@ -175,8 +178,11 @@ object ReassignPartitionsCommand extends Logging {
         val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topicAndPartition.topic, topicAndPartition.partition)
         if(assignedReplicas == newReplicas)
           ReassignmentCompleted
-        else
+        else {
+          println(("ERROR: Assigned replicas (%s) don't match the list of replicas for reassignment (%s)" +
+            " for partition %s").format(assignedReplicas.mkString(","), newReplicas.mkString(","), topicAndPartition))
           ReassignmentFailed
+        }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index d8078bd..02ccc17 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -20,7 +20,7 @@ import scala.collection._
 import kafka.admin.AdminUtils
 import kafka.utils._
 import java.lang.Object
-import kafka.api.LeaderAndIsr
+import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig
 import kafka.server.ReplicaManager
 import com.yammer.metrics.core.Gauge
@@ -28,7 +28,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
 import org.apache.log4j.Logger
 import kafka.message.ByteBufferMessageSet
-import kafka.common.{TopicAndPartition, NotLeaderForPartitionException, ErrorMapping}
+import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping}
 
 
 /**
@@ -131,25 +131,34 @@ class Partition(val topic: String,
     assignedReplicaMap.values.toSet
   }
 
+  def removeReplica(replicaId: Int) {
+    assignedReplicaMap.remove(replicaId)
+  }
+
   def getLeaderEpoch(): Int = {
     leaderIsrUpdateLock synchronized {
       return this.leaderEpoch
     }
   }
 
-
   /**
    * Make the local replica the leader by resetting LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
    *  and setting the new leader and ISR
    */
-  def makeLeader(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, correlationId: Int): Boolean = {
+  def makeLeader(controllerId: Int,
+                 partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
+      val allReplicas = partitionStateInfo.allReplicas
+      val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
-
+      // add replicas that are new
+      allReplicas.foreach(replica => getOrCreateReplica(replica))
       val newInSyncReplicas = leaderAndIsr.isr.map(r => getOrCreateReplica(r)).toSet
+      // remove assigned replicas that have been removed by the controller
+      (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
       // reset LogEndOffset for remote replicas
       assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
       inSyncReplicas = newInSyncReplicas
@@ -165,16 +174,24 @@ class Partition(val topic: String,
   /**
    *  Make the local replica the follower by setting the new leader and ISR to empty
    */
-  def makeFollower(controllerId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, leaders: Set[Broker], correlationId: Int): Boolean = {
+  def makeFollower(controllerId: Int,
+                   partitionStateInfo: PartitionStateInfo,
+                   leaders: Set[Broker], correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
+      val allReplicas = partitionStateInfo.allReplicas
+      val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
       val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr
       val newLeaderBrokerId: Int = leaderAndIsr.leader
+      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
+      // to maintain the decision maker controller's epoch in the zookeeper path
+      controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
       // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
       leaders.find(_.id == newLeaderBrokerId) match {
         case Some(leaderBroker) =>
-          // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
-          // to maintain the decision maker controller's epoch in the zookeeper path
-          controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
+          // add replicas that are new
+          allReplicas.foreach(r => getOrCreateReplica(r))
+          // remove assigned replicas that have been removed by the controller
+          (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
           inSyncReplicas = Set.empty[Replica]
           leaderEpoch = leaderAndIsr.leaderEpoch
           zkVersion = leaderAndIsr.zkVersion
@@ -192,7 +209,13 @@ class Partition(val topic: String,
   def updateLeaderHWAndMaybeExpandIsr(replicaId: Int, offset: Long) {
     leaderIsrUpdateLock synchronized {
       debug("Recording follower %d position %d for partition [%s,%d].".format(replicaId, offset, topic, partitionId))
-      val replica = getOrCreateReplica(replicaId)
+      val replicaOpt = getReplica(replicaId)
+      if(!replicaOpt.isDefined) {
+        throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d for partition [%s,%d] since the replica %d" +
+          " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId,
+            offset, topic, partitionId, replicaId, assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId))
+      }
+      val replica = replicaOpt.get
       replica.logEndOffset = offset
 
       // check if this replica needs to be added to the ISR
@@ -200,7 +223,11 @@ class Partition(val topic: String,
         case Some(leaderReplica) =>
           val replica = getReplica(replicaId).get
           val leaderHW = leaderReplica.highWatermark
-          if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) {
+          // For a replica to get added back to ISR, it has to satisfy 3 conditions-
+          // 1. It is not already in the ISR
+          // 2. It is part of the assigned replica list. See KAFKA-1097
+          // 3. It's log end offset >= leader's highwatermark
+          if (!inSyncReplicas.contains(replica) && assignedReplicas.map(_.brokerId).contains(replicaId) && replica.logEndOffset >= leaderHW) {
             // expand ISR
             val newInSyncReplicas = inSyncReplicas + replica
             info("Expanding ISR for partition [%s,%d] from %s to %s"

http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala b/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala
new file mode 100644
index 0000000..409d112
--- /dev/null
+++ b/core/src/main/scala/kafka/common/NotAssignedReplicaException.scala
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class NotAssignedReplicaException(message: String, cause: Throwable) extends RuntimeException(message, cause) {
+  def this(message: String) = this(message, null)
+  def this() = this(null, null)
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 88d130f..88792c2 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -35,6 +35,7 @@ import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
 import java.util.concurrent.atomic.AtomicInteger
 import scala.Some
 import kafka.common.TopicAndPartition
+import org.apache.log4j.Logger
 
 class ControllerContext(val zkClient: ZkClient,
                         val zkSessionTimeout: Int,
@@ -105,6 +106,7 @@ object KafkaController extends Logging {
 class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup with KafkaControllerMBean {
   this.logIdent = "[Controller " + config.brokerId + "]: "
   private var isRunning = true
+  private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
   val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
   private val partitionStateMachine = new PartitionStateMachine(this)
   private val replicaStateMachine = new ReplicaStateMachine(this)
@@ -359,17 +361,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    * Reassigning replicas for a partition goes through a few stages -
    * RAR = Reassigned replicas
    * AR = Original list of replicas for partition
-   * 1. Start new replicas RAR - AR.
-   * 2. Wait until new replicas are in sync with the leader
-   * 3. If the leader is not in RAR, elect a new leader from RAR
-   * 4. Stop old replicas AR - RAR
-   * 5. Write new AR
-   * 6. Remove partition from the /admin/reassign_partitions path
+   * 1. Write new AR = AR + RAR
+   * 2. Start new replicas RAR - AR.
+   * 3. Wait until new replicas are in sync with the leader
+   * 4. If the leader is not in RAR, elect a new leader from RAR
+   * 5. Stop old replicas AR - RAR
+   * 6. Write new AR = RAR
+   * 7. Remove partition from the /admin/reassign_partitions path
    */
   def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
     areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match {
       case true =>
+        val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
         // mark the new replicas as online
         reassignedReplicas.foreach { replica =>
           replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
@@ -378,9 +382,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         // check if current leader is in the new replicas list. If not, controller needs to trigger leader election
         moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
         // stop older replicas
-        stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext)
+        stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
         // write the new list of replicas for this partition in zookeeper
-        updateAssignedReplicasForPartition(topicAndPartition, reassignedPartitionContext)
+        updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
         // update the /admin/reassign_partitions path to remove this partition
         removePartitionFromReassignedPartitions(topicAndPartition)
         info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
@@ -390,8 +394,15 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       case false =>
         info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
           "reassigned not yet caught up with the leader")
+        val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
+        val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
+        // write the expanded list of replicas to zookeeper
+        updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
+        // update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
+        updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
+          newAndOldReplicas.toSeq)
         // start new replicas
-        startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext)
+        startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
         info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
           "reassigned to catch up with the leader")
     }
@@ -602,6 +613,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
                                                       reassignedPartitionContext: ReassignedPartitionsContext) {
     val reassignedReplicas = reassignedPartitionContext.newReplicas
     val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
+    // change the assigned replica list to just the reassigned replicas in the cache so it gets sent out on the LeaderAndIsr
+    // request to the current or new leader. This will prevent it from adding the old replicas to the ISR
+    val oldAndNewReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
+    controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
     if(!reassignedPartitionContext.newReplicas.contains(currentLeader)) {
       info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
         "is not in the new list of replicas %s. Re-electing leader".format(reassignedReplicas.mkString(",")))
@@ -613,6 +628,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         case true =>
           info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
             "is already in the new list of replicas %s and is alive".format(reassignedReplicas.mkString(",")))
+          // shrink replication factor and update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
+          updateLeaderEpochAndSendRequest(topicAndPartition, oldAndNewReplicas, reassignedReplicas)
         case false =>
           info("Leader %s for partition %s being reassigned, ".format(currentLeader, topicAndPartition) +
             "is already in the new list of replicas %s but is dead".format(reassignedReplicas.mkString(",")))
@@ -622,12 +639,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   private def stopOldReplicasOfReassignedPartition(topicAndPartition: TopicAndPartition,
-                                                   reassignedPartitionContext: ReassignedPartitionsContext) {
-    val reassignedReplicas = reassignedPartitionContext.newReplicas
+                                                   reassignedPartitionContext: ReassignedPartitionsContext,
+                                                   oldReplicas: Set[Int]) {
     val topic = topicAndPartition.topic
     val partition = topicAndPartition.partition
-    // send stop replica state change request to the old replicas
-    val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
     // first move the replica to offline state (the controller removes it from the ISR)
     oldReplicas.foreach { replica =>
       replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topic, partition, replica)), OfflineReplica)
@@ -639,31 +654,44 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
-                                                 reassignedPartitionContext: ReassignedPartitionsContext) {
-    val reassignedReplicas = reassignedPartitionContext.newReplicas
+                                                 replicas: Seq[Int]) {
     val partitionsAndReplicasForThisTopic = controllerContext.partitionReplicaAssignment.filter(_._1.topic.equals(topicAndPartition.topic))
-    partitionsAndReplicasForThisTopic.put(topicAndPartition, reassignedReplicas)
+    partitionsAndReplicasForThisTopic.put(topicAndPartition, replicas)
     updateAssignedReplicasForPartition(topicAndPartition, partitionsAndReplicasForThisTopic)
-    info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, reassignedReplicas.mkString(",")))
+    info("Updated assigned replicas for partition %s being reassigned to %s ".format(topicAndPartition, replicas.mkString(",")))
     // update the assigned replica list after a successful zookeeper write
-    controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas)
-    // stop watching the ISR changes for this partition
-    zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
-      controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
+    controllerContext.partitionReplicaAssignment.put(topicAndPartition, replicas)
   }
 
   private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition,
-                                                     reassignedPartitionContext: ReassignedPartitionsContext) {
+                                                     reassignedPartitionContext: ReassignedPartitionsContext,
+                                                     newReplicas: Set[Int]) {
     // send the start replica request to the brokers in the reassigned replicas list that are not in the assigned
     // replicas list
-    val assignedReplicaSet = Set.empty[Int] ++ controllerContext.partitionReplicaAssignment(topicAndPartition)
-    val reassignedReplicaSet = Set.empty[Int] ++ reassignedPartitionContext.newReplicas
-    val newReplicas: Seq[Int] = (reassignedReplicaSet -- assignedReplicaSet).toSeq
     newReplicas.foreach { replica =>
       replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), NewReplica)
     }
   }
 
+  private def updateLeaderEpochAndSendRequest(topicAndPartition: TopicAndPartition, replicasToReceiveRequest: Seq[Int], newAssignedReplicas: Seq[Int]) {
+    brokerRequestBatch.newBatch()
+    updateLeaderEpoch(topicAndPartition.topic, topicAndPartition.partition) match {
+      case Some(updatedLeaderIsrAndControllerEpoch) =>
+        // send the shrunk assigned replica list to all the replicas, including the leader, so that it no longer
+        // allows old replicas to enter ISR
+        brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasToReceiveRequest, topicAndPartition.topic,
+          topicAndPartition.partition, updatedLeaderIsrAndControllerEpoch, newAssignedReplicas)
+        brokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch, controllerContext.correlationId.getAndIncrement)
+        stateChangeLogger.trace(("Controller %d epoch %d sent LeaderAndIsr request %s with new assigned replica list %s " +
+          "to leader %d for partition being reassigned %s").format(config.brokerId, controllerContext.epoch, updatedLeaderIsrAndControllerEpoch,
+          newAssignedReplicas.mkString(","), updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader, topicAndPartition))
+      case None => // fail the reassignment
+        stateChangeLogger.error(("Controller %d epoch %d failed to send LeaderAndIsr request with new assigned replica list %s " +
+          "to leader for partition being reassigned %s").format(config.brokerId, controllerContext.epoch,
+          newAssignedReplicas.mkString(","), topicAndPartition))
+    }
+  }
+
   private def registerReassignedPartitionsListener() = {
     zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, new PartitionsReassignedListener(this))
   }
@@ -677,6 +705,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
+    // stop watching the ISR changes for this partition
+    zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
+      controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
     // read the current list of reassigned partitions from zookeeper
     val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
     // remove this partition from that list
@@ -793,6 +824,52 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     finalLeaderIsrAndControllerEpoch
   }
 
+  /**
+   * Does not change leader or isr, but just increments the leader epoch
+   * @param topic topic
+   * @param partition partition
+   * @return the new leaderAndIsr with an incremented leader epoch, or None if leaderAndIsr is empty.
+   */
+  private def updateLeaderEpoch(topic: String, partition: Int): Option[LeaderIsrAndControllerEpoch] = {
+    val topicAndPartition = TopicAndPartition(topic, partition)
+    debug("Updating leader epoch for partition %s.".format(topicAndPartition))
+    var finalLeaderIsrAndControllerEpoch: Option[LeaderIsrAndControllerEpoch] = None
+    var zkWriteCompleteOrUnnecessary = false
+    while (!zkWriteCompleteOrUnnecessary) {
+      // refresh leader and isr from zookeeper again
+      val leaderIsrAndEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
+      zkWriteCompleteOrUnnecessary = leaderIsrAndEpochOpt match {
+        case Some(leaderIsrAndEpoch) =>
+          val leaderAndIsr = leaderIsrAndEpoch.leaderAndIsr
+          val controllerEpoch = leaderIsrAndEpoch.controllerEpoch
+          if(controllerEpoch > epoch)
+            throw new StateChangeFailedException("Leader and isr path written by another controller. This probably" +
+              "means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
+              "controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
+          // increment the leader epoch even if there are no leader or isr changes to allow the leader to cache the expanded
+          // assigned replica list
+          val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
+                                                 leaderAndIsr.isr, leaderAndIsr.zkVersion + 1)
+          // update the new leadership decision in zookeeper or retry
+          val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
+            zkClient,
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+            ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, epoch),
+            leaderAndIsr.zkVersion)
+          newLeaderAndIsr.zkVersion = newVersion
+          finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
+          if (updateSucceeded)
+            info("Updated leader epoch for partition %s to %d".format(topicAndPartition, newLeaderAndIsr.leaderEpoch))
+          updateSucceeded
+        case None =>
+          throw new IllegalStateException(("Cannot update leader epoch for partition %s as leaderAndIsr path is empty. " +
+            "This could mean we somehow tried to reassign a partition that doesn't exist").format(topicAndPartition))
+          true
+      }
+    }
+    finalLeaderIsrAndControllerEpoch
+  }
+
   class SessionExpirationListener() extends IZkStateListener with Logging {
     this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "
     @throws(classOf[Exception])

http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 212c05d..c52225a 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -165,6 +165,8 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
           replicaState.put((topic, partition, replicaId), OnlineReplica)
         case OfflineReplica =>
           assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
+          // send stop replica command to the replica so that it stops fetching from the leader
+          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
           // As an optimization, the controller removes dead replicas from the ISR
           val leaderAndIsrIsEmpty: Boolean =
             controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 7b8f89e..161f581 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -28,7 +28,7 @@ import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
 import kafka.common._
-import kafka.api.{StopReplicaRequest, LeaderAndIsrRequest}
+import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
 import kafka.controller.{LeaderIsrAndControllerEpoch, KafkaController}
 import org.apache.log4j.Logger
 
@@ -143,7 +143,7 @@ class ReplicaManager(val config: KafkaConfig,
         controllerEpoch = stopReplicaRequest.controllerEpoch
         val responseMap = new HashMap[(String, Int), Short]
         // First stop fetchers for all partitions, then stop the corresponding replicas
-        replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map{
+        replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map {
           case (topic, partition) => TopicAndPartition(topic, partition)
         })
         for((topic, partitionId) <- stopReplicaRequest.partitions){
@@ -222,27 +222,27 @@ class ReplicaManager(val config: KafkaConfig,
         controllerEpoch = leaderAndISRRequest.controllerEpoch
 
         // First check partition's leader epoch
-        val partitionleaderIsrAndControllerEpoch = new HashMap[Partition, LeaderIsrAndControllerEpoch]()
+        val partitionState = new HashMap[Partition, PartitionStateInfo]()
         leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) =>
           val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
           val partitionLeaderEpoch = partition.getLeaderEpoch()
           if (partitionLeaderEpoch < partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch) {
             // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
             // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-            partitionleaderIsrAndControllerEpoch.put(partition, partitionStateInfo.leaderIsrAndControllerEpoch)
+            partitionState.put(partition, partitionStateInfo)
           } else {
             // Otherwise record the error code in response
             stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " +
               "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d")
               .format(localBrokerId, correlationId, controllerId, partitionStateInfo.leaderIsrAndControllerEpoch.controllerEpoch,
-              partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition, partitionLeaderEpoch))
+              partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch))
             responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
           }
         }
 
-        val partitionsTobeLeader = partitionleaderIsrAndControllerEpoch
-          .filter{ case (partition, leaderIsrAndControllerEpoch) => leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
-        val partitionsTobeFollower = (partitionleaderIsrAndControllerEpoch -- partitionsTobeLeader.keys)
+        val partitionsTobeLeader = partitionState
+          .filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
+        val partitionsTobeFollower = (partitionState -- partitionsTobeLeader.keys)
 
         if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap)
         if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
@@ -271,28 +271,30 @@ class ReplicaManager(val config: KafkaConfig,
    * the error message will be set on each partition since we do not know which partition caused it
    *  TODO: the above may need to be fixed later
    */
-  private def makeLeaders(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs: Map[Partition, LeaderIsrAndControllerEpoch],
+  private def makeLeaders(controllerId: Int, epoch: Int,
+                          partitionState: Map[Partition, PartitionStateInfo],
                           correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) = {
     stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
       "starting the become-leader transition for partitions %s")
       .format(localBrokerId, correlationId, controllerId, epoch,
-      partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
+      partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(",")))
 
-    for (partition <- partitionLeaderISRAndControllerEpochs.keys)
+    for (partition <- partitionState.keys)
       responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
 
     try {
       // First stop fetchers for all the partitions
-      replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new TopicAndPartition(_)))
+      replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
       stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d"
-        .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId))
+        .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId))
 
       // Update the partition information to be the leader
-      partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch) => partition.makeLeader(controllerId, leaderIsrAndControllerEpoch, correlationId)}
+      partitionState.foreach{ case (partition, partitionStateInfo) =>
+        partition.makeLeader(controllerId, partitionStateInfo, correlationId)}
 
       // Finally add these partitions to the list of partitions for which the leader is the current broker
       leaderPartitionsLock synchronized {
-        leaderPartitions ++= partitionLeaderISRAndControllerEpochs.keySet
+        leaderPartitions ++= partitionState.keySet
       }
     } catch {
       case e: Throwable =>
@@ -305,7 +307,8 @@ class ReplicaManager(val config: KafkaConfig,
 
     stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
       "for the become-leader transition for partitions %s")
-      .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
+      .format(localBrokerId, correlationId, controllerId, epoch,
+      partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(",")))
   }
 
   /*
@@ -325,30 +328,33 @@ class ReplicaManager(val config: KafkaConfig,
    * the error message will be set on each partition since we do not know which partition caused it
    *  TODO: the above may need to be fixed later
    */
-  private def makeFollowers(controllerId: Int, epoch: Int, partitionLeaderISRAndControllerEpochs: Map[Partition, LeaderIsrAndControllerEpoch],
+  private def makeFollowers(controllerId: Int, epoch: Int,
+                            partitionState: Map[Partition, PartitionStateInfo],
                             leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) {
     stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " +
       "starting the become-follower transition for partitions %s")
       .format(localBrokerId, correlationId, controllerId, epoch,
-      partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
+      partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(",")))
 
-    for (partition <- partitionLeaderISRAndControllerEpochs.keys)
+    for (partition <- partitionState.keys)
       responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
 
     try {
-      replicaFetcherManager.removeFetcherForPartitions(partitionLeaderISRAndControllerEpochs.keySet.map(new TopicAndPartition(_)))
+      replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
       stateChangeLogger.trace("Broker %d stopped fetchers for partitions %s as per becoming-follower request from controller %d epoch %d"
-        .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId))
+        .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId))
 
-      logManager.truncateTo(partitionLeaderISRAndControllerEpochs.map{ case(partition, leaderISRAndControllerEpoch) =>
+      logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) =>
         new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark
       })
       stateChangeLogger.trace("Broker %d truncated logs and checkpoint recovery boundaries for partitions %s as per becoming-follower request from controller %d epoch %d"
-        .format(localBrokerId, partitionLeaderISRAndControllerEpochs.keySet.mkString(","), controllerId, correlationId))
+        .format(localBrokerId, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(","), controllerId, correlationId))
 
       if (!isShuttingDown.get()) {
-        replicaFetcherManager.addFetcherForPartitions(partitionLeaderISRAndControllerEpochs.map{ case(partition, leaderISRAndControllerEpoch) =>
-          new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == leaderISRAndControllerEpoch.leaderAndIsr.leader).get, partition.getReplica().get.logEndOffset)}
+        replicaFetcherManager.addFetcherForPartitions(partitionState.map{ case(partition, partitionStateInfo) =>
+          new TopicAndPartition(partition) ->
+            BrokerAndInitialOffset(leaders.find(_.id == partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader).get,
+              partition.getReplica().get.logEndOffset)}
         )
       }
       else {
@@ -357,10 +363,11 @@ class ReplicaManager(val config: KafkaConfig,
           .format(localBrokerId, correlationId, controllerId, epoch))
       }
 
-      partitionLeaderISRAndControllerEpochs.foreach{ case (partition, leaderIsrAndControllerEpoch) => partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)}
+      partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) =>
+        partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)}
 
       leaderPartitionsLock synchronized {
-        leaderPartitions --= partitionLeaderISRAndControllerEpochs.keySet
+        leaderPartitions --= partitionState.keySet
       }
     } catch {
       case e: Throwable =>
@@ -373,7 +380,7 @@ class ReplicaManager(val config: KafkaConfig,
 
     stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
       "for the become-follower transition for partitions %s")
-      .format(localBrokerId, correlationId, controllerId, epoch, partitionLeaderISRAndControllerEpochs.keySet.mkString(",")))
+      .format(localBrokerId, correlationId, controllerId, epoch, partitionState.keySet.map(p => TopicAndPartition(p.topic, p.partitionId)).mkString(",")))
   }
 
   private def maybeShrinkIsr(): Unit = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/kafka-patch-review.py
----------------------------------------------------------------------
diff --git a/kafka-patch-review.py b/kafka-patch-review.py
index daf2c35..7fa6cb5 100644
--- a/kafka-patch-review.py
+++ b/kafka-patch-review.py
@@ -95,12 +95,12 @@ def main():
 
   comment="Created reviewboard " 
   if not opt.reviewboard:
-    print 'Created a new reviewboard ',rb_url,' against branch ',opt.branch
+    print 'Created a new reviewboard ',rb_url,
   else:
-    print 'Updated reviewboard',opt.reviewboard
+    print 'Updated reviewboard'
     comment="Updated reviewboard "
 
-  comment = comment + rb_url 
+  comment = comment + rb_url + ' against branch ' + opt.branch 
   jira.add_comment(opt.jira, comment)
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/kafka/blob/df7f7a25/project/Build.scala
----------------------------------------------------------------------
diff --git a/project/Build.scala b/project/Build.scala
index bcd1ca5..40e0c4f 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -44,7 +44,7 @@ object KafkaBuild extends Build {
     crossScalaVersions := Seq("2.8.0","2.8.2", "2.9.1", "2.9.2", "2.10.1"),
     excludeFilter in unmanagedSources <<= scalaVersion(v => if (v.startsWith("2.8")) "*_2.9+.scala" else "*_2.8.scala"),
     scalaVersion := "2.8.0",
-    version := "0.8.0",
+    version := "0.8.1",
     publishTo := Some("Apache Maven Repo" at "https://repository.apache.org/service/local/staging/deploy/maven2"),
     credentials += Credentials(Path.userHome / ".m2" / ".credentials"),
     buildNumber := System.getProperty("build.number", ""),