You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/02/07 05:18:38 UTC

[2/3] KAFKA-330 Delete topic; reviewed by Jun Rao, Guozhang Wang and Joel Koshy

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 483559a..613aec6 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -23,6 +23,8 @@ import kafka.common.{TopicAndPartition, StateChangeFailedException}
 import kafka.utils.{ZkUtils, Logging}
 import org.I0Itec.zkclient.IZkChildListener
 import org.apache.log4j.Logger
+import kafka.controller.Callbacks._
+import kafka.utils.Utils._
 
 /**
  * This class represents the state machine for replicas. It defines the states that a replica can be in, and
@@ -35,15 +37,19 @@ import org.apache.log4j.Logger
  *                        Valid previous state are NewReplica, OnlineReplica or OfflineReplica
  * 3. OfflineReplica    : If a replica dies, it moves to this state. This happens when the broker hosting the replica
  *                        is down. Valid previous state are NewReplica, OnlineReplica
- * 4. NonExistentReplica: If a replica is deleted, it is moved to this state. Valid previous state is OfflineReplica
+ * 4. ReplicaDeletionStarted: If replica deletion starts, it is moved to this state. Valid previous state is OfflineReplica
+ * 5. ReplicaDeletionSuccessful: If replica responds with no error code in response to a delete replica request, it is
+ *                        moved to this state. Valid previous state is ReplicaDeletionStarted
+ * 6. ReplicaDeletionFailed: If replica deletion fails, it is moved to this state. Valid previous state is ReplicaDeletionStarted
+ * 7. NonExistentReplica: If a replica is deleted successfully, it is moved to this state. Valid previous state is
+ *                        ReplicaDeletionSuccessful
  */
 class ReplicaStateMachine(controller: KafkaController) extends Logging {
   private val controllerContext = controller.controllerContext
   private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
-  var replicaState: mutable.Map[(String, Int, Int), ReplicaState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.controllerContext, controller.sendRequest,
-    controllerId, controller.clientId)
+  var replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
   private val hasStarted = new AtomicBoolean(false)
   this.logIdent = "[Replica state machine on controller " + controller.config.brokerId + "]: "
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
@@ -81,14 +87,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
    * @param targetState  The state that the replicas should be moved to
    * The controller's allLeaders cache should have been updated before this
    */
-  def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState) {
-    info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
-    try {
-      brokerRequestBatch.newBatch()
-      replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState))
-      brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
-    }catch {
-      case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
+  def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
+                         callbacks: Callbacks = (new CallbackBuilder).build) {
+    if(replicas.size > 0) {
+      info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
+      try {
+        brokerRequestBatch.newBatch()
+        replicas.foreach(r => handleStateChange(r, targetState, callbacks))
+        brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
+      }catch {
+        case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
+      }
     }
   }
 
@@ -96,38 +105,53 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
    * This API exercises the replica's state machine. It ensures that every state transition happens from a legal
    * previous state to the target state. Valid state transitions are:
    * NonExistentReplica --> NewReplica
-   * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
+   * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
+   *   partition to every live broker
    *
    * NewReplica -> OnlineReplica
    * --add the new replica to the assigned replica list if needed
    *
    * OnlineReplica,OfflineReplica -> OnlineReplica
-   * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
+   * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the
+   *   partition to every live broker
    *
-   * NewReplica,OnlineReplica -> OfflineReplica
+   * NewReplica,OnlineReplica,OfflineReplica,ReplicaDeletionFailed -> OfflineReplica
    * --send StopReplicaRequest to the replica (w/o deletion)
-   * --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and UpdateMetadata request for the partition to every live broker.
+   * --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and
+   *   UpdateMetadata request for the partition to every live broker.
    *
-   * OfflineReplica -> NonExistentReplica
+   * OfflineReplica -> ReplicaDeletionStarted
    * --send StopReplicaRequest to the replica (with deletion)
    *
-   * @param topic       The topic of the replica for which the state transition is invoked
-   * @param partition   The partition of the replica for which the state transition is invoked
-   * @param replicaId   The replica for which the state transition is invoked
+   * ReplicaDeletionStarted -> ReplicaDeletionSuccessful
+   * -- mark the state of the replica in the state machine
+   *
+   * ReplicaDeletionStarted -> ReplicaDeletionFailed
+   * -- mark the state of the replica in the state machine
+   *
+   * ReplicaDeletionSuccessful -> NonExistentReplica
+   * -- remove the replica from the in memory partition replica assignment cache
+
+
+   * @param partitionAndReplica The replica for which the state transition is invoked
    * @param targetState The end state that the replica should be moved to
    */
-  def handleStateChange(topic: String, partition: Int, replicaId: Int, targetState: ReplicaState) {
+  def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState,
+                        callbacks: Callbacks) {
+    val topic = partitionAndReplica.topic
+    val partition = partitionAndReplica.partition
+    val replicaId = partitionAndReplica.replica
     val topicAndPartition = TopicAndPartition(topic, partition)
     if (!hasStarted.get)
       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change of replica %d for partition %s " +
                                             "to %s failed because replica state machine has not started")
                                               .format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))
     try {
-      replicaState.getOrElseUpdate((topic, partition, replicaId), NonExistentReplica)
+      replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
       val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
       targetState match {
         case NewReplica =>
-          assertValidPreviousStates(topic, partition, replicaId, List(NonExistentReplica), targetState)
+          assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
           // start replica as a follower to the current leader for its partition
           val leaderIsrAndControllerEpochOpt = ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition)
           leaderIsrAndControllerEpochOpt match {
@@ -140,22 +164,39 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                                                                   replicaAssignment)
             case None => // new leader request will be sent to this replica when one gets elected
           }
-          replicaState.put((topic, partition, replicaId), NewReplica)
+          replicaState.put(partitionAndReplica, NewReplica)
           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NewReplica"
                                     .format(controllerId, controller.epoch, replicaId, topicAndPartition))
-        case NonExistentReplica =>
-          assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
+        case ReplicaDeletionStarted =>
+          assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
+          replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
           // send stop replica command
-          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true)
+          brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
+            callbacks.stopReplicaResponseCallback)
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to ReplicaDeletionStarted"
+            .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+        case ReplicaDeletionFailed =>
+          assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
+          replicaState.put(partitionAndReplica, ReplicaDeletionFailed)
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to ReplicaDeletionFailed"
+            .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+        case ReplicaDeletionSuccessful =>
+          assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
+          replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
+          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to ReplicaDeletionSuccessful"
+            .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+        case NonExistentReplica =>
+          assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
           // remove this replica from the assigned replicas list for its partition
           val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
           controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
-          replicaState.remove((topic, partition, replicaId))
+          replicaState.remove(partitionAndReplica)
           stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to NonExistentReplica"
-                                    .format(controllerId, controller.epoch, replicaId, topicAndPartition))
+            .format(controllerId, controller.epoch, replicaId, topicAndPartition))
         case OnlineReplica =>
-          assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
-          replicaState((topic, partition, replicaId)) match {
+          assertValidPreviousStates(partitionAndReplica,
+            List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionFailed), targetState)
+          replicaState(partitionAndReplica) match {
             case NewReplica =>
               // add this replica to the assigned replicas list for its partition
               val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
@@ -169,17 +210,17 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                 case Some(leaderIsrAndControllerEpoch) =>
                   brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
                     replicaAssignment)
-                  replicaState.put((topic, partition, replicaId), OnlineReplica)
+                  replicaState.put(partitionAndReplica, OnlineReplica)
                   stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica"
                     .format(controllerId, controller.epoch, replicaId, topicAndPartition))
                 case None => // that means the partition was never in OnlinePartition state, this means the broker never
                   // started a log for that partition and does not have a high watermark value for this partition
               }
-
           }
-          replicaState.put((topic, partition, replicaId), OnlineReplica)
+          replicaState.put(partitionAndReplica, OnlineReplica)
         case OfflineReplica =>
-          assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
+          assertValidPreviousStates(partitionAndReplica,
+            List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionFailed), targetState)
           // send stop replica command to the replica so that it stops fetching from the leader
           brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
           // As an optimization, the controller removes dead replicas from the ISR
@@ -191,7 +232,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                     // send the shrunk ISR state change request only to the leader
                     brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
                       topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
-                    replicaState.put((topic, partition, replicaId), OfflineReplica)
+                    replicaState.put(partitionAndReplica, OfflineReplica)
                     stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica"
                       .format(controllerId, controller.epoch, replicaId, topicAndPartition))
                     false
@@ -214,12 +255,34 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     }
   }
 
-  private def assertValidPreviousStates(topic: String, partition: Int, replicaId: Int, fromStates: Seq[ReplicaState],
+  def areAllReplicasForTopicDeleted(topic: String): Boolean = {
+    val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
+    val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
+    debug("Are all replicas for topic %s deleted %s".format(topic, replicaStatesForTopic))
+    replicaStatesForTopic.foldLeft(true)((deletionState, r) => deletionState && r._2 == ReplicaDeletionSuccessful)
+  }
+
+  def isAtLeastOneReplicaInDeletionStartedState(topic: String): Boolean = {
+    val replicasForTopic = controller.controllerContext.replicasForTopic(topic)
+    val replicaStatesForTopic = replicasForTopic.map(r => (r, replicaState(r))).toMap
+    replicaStatesForTopic.foldLeft(false)((deletionState, r) => deletionState || r._2 == ReplicaDeletionStarted)
+  }
+
+  def replicasInState(topic: String, state: ReplicaState): Set[PartitionAndReplica] = {
+    replicaState.filter(r => r._1.topic.equals(topic) && r._2 == state).keySet
+  }
+
+  def replicasInDeletionStates(topic: String): Set[PartitionAndReplica] = {
+    val deletionStates = Set(ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionFailed)
+    replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet
+  }
+
+  private def assertValidPreviousStates(partitionAndReplica: PartitionAndReplica, fromStates: Seq[ReplicaState],
                                         targetState: ReplicaState) {
-    assert(fromStates.contains(replicaState((topic, partition, replicaId))),
-      "Replica %s for partition [%s,%d] should be in the %s states before moving to %s state"
-        .format(replicaId, topic, partition, fromStates.mkString(","), targetState) +
-        ". Instead it is in %s state".format(replicaState((topic, partition, replicaId))))
+    assert(fromStates.contains(replicaState(partitionAndReplica)),
+      "Replica %s should be in the %s states before moving to %s state"
+        .format(partitionAndReplica, fromStates.mkString(","), targetState) +
+        ". Instead it is in %s state".format(replicaState(partitionAndReplica)))
   }
 
   private def registerBrokerChangeListener() = {
@@ -235,14 +298,23 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
       val topic = topicPartition.topic
       val partition = topicPartition.partition
       assignedReplicas.foreach { replicaId =>
+        val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
         controllerContext.liveBrokerIds.contains(replicaId) match {
-          case true => replicaState.put((topic, partition, replicaId), OnlineReplica)
-          case false => replicaState.put((topic, partition, replicaId), OfflineReplica)
+          case true => replicaState.put(partitionAndReplica, OnlineReplica)
+          case false =>
+            // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
+            // This is required during controller failover since during controller failover a broker can go down,
+            // so the replicas on that broker should be moved to ReplicaDeletionFailed to be on the safer side.
+            replicaState.put(partitionAndReplica, ReplicaDeletionFailed)
         }
       }
     }
   }
 
+  def partitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = {
+    controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq
+  }
+
   /**
    * This is the zookeeper listener that triggers all the state transitions for a replica
    */
@@ -250,7 +322,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     this.logIdent = "[BrokerChangeListener on Controller " + controller.config.brokerId + "]: "
     def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
       info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
-      controllerContext.controllerLock synchronized {
+      inLock(controllerContext.controllerLock) {
         if (hasStarted.get) {
           ControllerStats.leaderElectionTimer.time {
             try {
@@ -282,6 +354,9 @@ sealed trait ReplicaState { def state: Byte }
 case object NewReplica extends ReplicaState { val state: Byte = 1 }
 case object OnlineReplica extends ReplicaState { val state: Byte = 2 }
 case object OfflineReplica extends ReplicaState { val state: Byte = 3 }
-case object NonExistentReplica extends ReplicaState { val state: Byte = 4 }
+case object ReplicaDeletionStarted extends ReplicaState { val state: Byte = 4}
+case object ReplicaDeletionSuccessful extends ReplicaState { val state: Byte = 5}
+case object ReplicaDeletionFailed extends ReplicaState { val state: Byte = 6}
+case object NonExistentReplica extends ReplicaState { val state: Byte = 7 }
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
new file mode 100644
index 0000000..91a446d
--- /dev/null
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.controller
+
+import collection.mutable
+import kafka.utils.{ShutdownableThread, Logging, ZkUtils}
+import kafka.utils.Utils._
+import collection.Set
+import kafka.common.{ErrorMapping, TopicAndPartition}
+import kafka.api.{StopReplicaResponse, RequestOrResponse}
+
+/**
+ * This manages the state machine for topic deletion.
+ * 1. TopicCommand issues topic deletion by creating a new admin path /admin/delete_topics/<topic>
+ * 2. The controller listens for child changes on /admin/delete_topic and starts topic deletion for the respective topics
+ * 3. The controller has a background thread that handles topic deletion. The purpose of having this background thread
+ *    is to accommodate the TTL feature, when we have it. This thread is signaled whenever deletion for a topic needs to
+ *    be started or resumed. Currently, a topic's deletion can be started only by the onPartitionDeletion callback on the
+ *    controller. In the future, it can be triggered based on the configured TTL for the topic. A topic's deletion will
+ *    be halted in the following scenarios -
+ *    3.1 broker hosting one of the replicas for that topic goes down
+ *    3.2 partition reassignment for partitions of that topic is in progress
+ *    3.3 preferred replica election for partitions of that topic is in progress
+ *    (though this is not strictly required since it holds the controller lock for the entire duration from start to end)
+ * 4. Topic deletion is resumed when -
+ *    4.1 broker hosting one of the replicas for that topic is started
+ *    4.2 preferred replica election for partitions of that topic completes
+ *    4.3 partition reassignment for partitions of that topic completes
+ * 5. Every replica for a topic being deleted is in either of the 3 states -
+ *    5.1 TopicDeletionStarted (Replica enters TopicDeletionStarted phase when the onPartitionDeletion callback is invoked.
+ *        This happens when the child change watch for /admin/delete_topics fires on the controller. As part of this state
+ *        change, the controller sends StopReplicaRequests to all replicas. It registers a callback for the
+ *        StopReplicaResponse when deletePartition=true thereby invoking a callback when a response for delete replica
+ *        is received from every replica)
+ *    5.2 TopicDeletionSuccessful (deleteTopicStopReplicaCallback() moves replicas from
+ *        TopicDeletionStarted->TopicDeletionSuccessful depending on the error codes in StopReplicaResponse)
+ *    5.3 TopicDeletionFailed. (deleteTopicStopReplicaCallback() moves replicas from
+ *        TopicDeletionStarted->TopicDeletionFailed depending on the error codes in StopReplicaResponse.
+ *        In general, if a broker dies and if it hosted replicas for topics being deleted, the controller marks the
+ *        respective replicas in TopicDeletionFailed state in the onBrokerFailure callback. The reason is that if a
+ *        broker fails before the request is sent and after the replica is in TopicDeletionStarted state,
+ *        it is possible that the replica will mistakenly remain in TopicDeletionStarted state and topic deletion
+ *        will not be retried when the broker comes back up.)
+ * 6. The delete topic thread marks a topic successfully deleted only if all replicas are in TopicDeletionSuccessful
+ *    state and it starts the topic deletion teardown mode where it deletes all topic state from the controllerContext
+ *    as well as from zookeeper. This is the only time the /brokers/topics/<topic> path gets deleted. On the other hand,
+ *    if no replica is in TopicDeletionStarted state and at least one replica is in TopicDeletionFailed state, then
+ *    it marks the topic for deletion retry.
+ * @param controller
+ * @param initialTopicsToBeDeleted The topics that are queued up for deletion in zookeeper at the time of controller failover
+ * @param initialHaltedTopicsForDeletion The topics for which deletion is halted due to any of the conditions mentioned in #3 above
+ */
+class TopicDeletionManager(controller: KafkaController,
+                           initialTopicsToBeDeleted: Set[String] = Set.empty,
+                           initialHaltedTopicsForDeletion: Set[String] = Set.empty) extends Logging {
+  val controllerContext = controller.controllerContext
+  val partitionStateMachine = controller.partitionStateMachine
+  val replicaStateMachine = controller.replicaStateMachine
+  var topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted
+  var haltedTopicsForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
+    (initialHaltedTopicsForDeletion & initialTopicsToBeDeleted)
+  val deleteTopicsCond = controllerContext.controllerLock.newCondition()
+  var deleteTopicStateChanged: Boolean = false
+  var deleteTopicsThread: DeleteTopicsThread = null
+
+  /**
+   * Invoked at the end of new controller initiation
+   */
+  def start() {
+    deleteTopicsThread = new DeleteTopicsThread()
+    deleteTopicStateChanged = true
+    deleteTopicsThread.start()
+  }
+
+  /**
+   * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared
+   */
+  def shutdown() {
+    deleteTopicsThread.shutdown()
+    topicsToBeDeleted.clear()
+    haltedTopicsForDeletion.clear()
+  }
+
+  /**
+   * Invoked by the child change listener on /admin/delete_topics to queue up the topics for deletion. The topic gets added
+   * to the topicsToBeDeleted list and only gets removed from the list when the topic deletion has completed successfully
+   * i.e. all replicas of all partitions of that topic are deleted successfully.
+   * @param topics Topics that should be deleted
+   */
+  def enqueueTopicsForDeletion(topics: Set[String]) {
+    topicsToBeDeleted ++= topics
+    resumeTopicDeletionThread()
+  }
+
+  /**
+   * Invoked when any event that can possibly resume topic deletion occurs. These events include -
+   * 1. New broker starts up. Any replicas belonging to topics queued up for deletion can be deleted since the broker is up
+   * 2. Partition reassignment completes. Any partitions belonging to topics queued up for deletion finished reassignment
+   * 3. Preferred replica election completes. Any partitions belonging to topics queued up for deletion finished
+   *    preferred replica election
+   * @param topics Topics for which deletion can be resumed
+   */
+  def resumeDeletionForTopics(topics: Set[String] = Set.empty) {
+    val topicsToResumeDeletion = topics & topicsToBeDeleted
+    if(topicsToResumeDeletion.size > 0) {
+      haltedTopicsForDeletion --= topicsToResumeDeletion
+      resumeTopicDeletionThread()
+    }
+  }
+
+  /**
+   * Invoked when a broker that hosts replicas for topics to be deleted goes down. Also invoked when the callback for
+   * StopReplicaResponse receives an error code for the replicas of a topic to be deleted. As part of this, the replicas
+   * are moved from ReplicaDeletionStarted to ReplicaDeletionFailed state. Also, the topic is added to the list of topics
+   * for which deletion is halted until further notice. The delete topic thread is notified so it can retry topic deletion
+   * if it has received a response for all replicas of a topic to be deleted
+   * @param replicas Replicas for which deletion has failed
+   */
+  def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {
+    val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
+    if(replicasThatFailedToDelete.size > 0) {
+      val topics = replicasThatFailedToDelete.map(_.topic)
+      debug("Deletion failed for replicas %s. Halting deletion for topics %s"
+        .format(replicasThatFailedToDelete.mkString(","), topics))
+      controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionFailed)
+      haltTopicDeletion(topics)
+      resumeTopicDeletionThread()
+    }
+  }
+
+  /**
+   * Halt delete topic if -
+   * 1. replicas being down
+   * 2. partition reassignment in progress for some partitions of the topic
+   * 3. preferred replica election in progress for some partitions of the topic
+   * @param topics Topics for which deletion should be halted. No op if the topic is was not previously queued up for deletion
+   */
+  def haltTopicDeletion(topics: Set[String]) {
+    val newTopicsToHaltDeletion = topicsToBeDeleted & topics
+    haltedTopicsForDeletion ++= newTopicsToHaltDeletion
+    if(newTopicsToHaltDeletion.size > 0)
+      info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
+  }
+
+  def isTopicDeletionHalted(topic: String): Boolean = {
+    haltedTopicsForDeletion.contains(topic)
+  }
+
+  def isTopicDeletionInProgress(topic: String): Boolean = {
+    controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)
+  }
+
+  def isTopicQueuedUpForDeletion(topic: String): Boolean = {
+    topicsToBeDeleted.contains(topic)
+  }
+
+  /**
+   * Invoked by the delete-topic-thread to wait until events that either trigger, restart or halt topic deletion occur.
+   * controllerLock should be acquired before invoking this API
+   */
+  private def awaitTopicDeletionNotification() {
+    while(!deleteTopicStateChanged) {
+      info("Waiting for signal to start or continue topic deletion")
+      deleteTopicsCond.await()
+    }
+    deleteTopicStateChanged = false
+  }
+
+  /**
+   * Signals the delete-topic-thread to process topic deletion
+   */
+  private def resumeTopicDeletionThread() {
+    deleteTopicStateChanged = true
+    deleteTopicsCond.signal()
+  }
+
+  /**
+   * Invoked by the StopReplicaResponse callback when it receives no error code for a replica of a topic to be deleted.
+   * As part of this, the replicas are moved from ReplicaDeletionStarted to ReplicaDeletionSuccessful state. The delete
+   * topic thread is notified so it can tear down the topic if all replicas of a topic have been successfully deleted
+   * @param replicas Replicas that were successfully deleted by the broker
+   */
+  private def completeReplicaDeletion(replicas: Set[PartitionAndReplica]) {
+    val successfullyDeletedReplicas = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
+    debug("Deletion successfully completed for replicas %s".format(successfullyDeletedReplicas.mkString(",")))
+    controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas, ReplicaDeletionSuccessful)
+    resumeTopicDeletionThread()
+  }
+
+  /**
+   * Topic deletion can be retried if -
+   * 1. Topic deletion is not already complete
+   * 2. Topic deletion is currently not in progress for that topic
+   * 3. Topic deletion is currently halted for that topic
+   * @param topic Topic
+   * @return Whether or not deletion can be retried for the topic
+   */
+  private def isTopicEligibleForDeletion(topic: String): Boolean = {
+    topicsToBeDeleted.contains(topic) && (!isTopicDeletionInProgress(topic) && !isTopicDeletionHalted(topic))
+  }
+
+  /**
+   * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
+   * To ensure a successful retry, reset states for respective replicas from ReplicaDeletionFailed to OfflineReplica state
+   *@param topic Topic for which deletion should be retried
+   */
+  private def markTopicForDeletionRetry(topic: String) {
+    // reset replica states from ReplicaDeletionFailed to OfflineReplica
+    val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionFailed)
+    controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
+  }
+
+  private def completeDeleteTopic(topic: String) {
+    val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
+    // controller will remove this replica from the state machine as well as its partition assignment cache
+    replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
+    val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
+    // move respective partition to OfflinePartition and NonExistentPartition state
+    partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
+    partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
+    topicsToBeDeleted -= topic
+    controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
+    controllerContext.zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
+    controllerContext.zkClient.delete(ZkUtils.getDeleteTopicPath(topic))
+    controllerContext.removeTopic(topic)
+  }
+
+  /**
+   * This callback is invoked by the DeleteTopics thread with the list of topics to be deleted
+   * It invokes the delete partition callback for all partitions of a topic
+   */
+  private def onTopicDeletion(topics: Set[String]) {
+    info("Topic deletion callback for %s".format(topics.mkString(",")))
+    val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
+    topics.foreach { topic =>
+      onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)
+    }
+  }
+
+  /**
+   * Invoked by the onPartitionDeletion callback. It is the 2nd step of topic deletion, the first being sending
+   * UpdateMetadata requests to all brokers to start rejecting requests for deleted topics. As part of starting deletion,
+   * the topics are added to the in progress list. As long as a topic is in the in progress list, deletion for that topic
+   * is never retried. A topic is removed from the in progress list when
+   * 1. Either the topic is successfully deleted OR
+   * 2. No replica for the topic is in ReplicaDeletionStarted state and at least one replica is in ReplicaDeletionFailed state
+   * If the topic is queued for deletion but deletion is not currently under progress, then deletion is retried for that topic
+   * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends
+   * the replicas a StopReplicaRequest (delete=true)
+   * This callback does the following things -
+   * 1. Send metadata request to all brokers excluding the topics to be deleted
+   * 2. Move all dead replicas directly to ReplicaDeletionFailed state. Also halt the deletion of respective topics if
+   *    some replicas are dead since it won't complete successfully anyway
+   * 3. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
+   *@param replicasForTopicsToBeDeleted
+   */
+  private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
+    replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>
+      // send update metadata so that brokers stop serving data
+      controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+      var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
+      val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
+      val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
+      val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
+      // move dead replicas directly to failed state
+      replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionFailed)
+      // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
+      replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
+      debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
+      controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
+        new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
+      if(deadReplicasForTopic.size > 0)
+        haltTopicDeletion(Set(topic))
+    }
+  }
+
+  /**
+   * This callback is invoked by the delete topic callback with the list of partitions for topics to be deleted
+   * It does the following -
+   * 1. Send UpdateMetadataRequest to all live brokers (that are not shutting down) with all partitions except those for
+   *    which the topics are being deleted. The brokers start rejecting all client requests with UnknownTopicOrPartitionException
+   * 2. Move all replicas for the partitions to OfflineReplica state. This will send StopReplicaRequest to the replicas
+   *    and LeaderAndIsrRequest to the leader with the shrunk ISR. When the leader replica itself is moved to OfflineReplica state,
+   *    it will skip sending the LeaderAndIsrRequest since the leader will be updated to -1
+   * 3. Move all replicas to ReplicaDeletionStarted state. This will send StopReplicaRequest with deletePartition=true. And
+   *    will delete all persistent data from all replicas of the respective partitions
+   */
+  private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
+    info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
+    val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
+    startReplicaDeletion(replicasPerPartition)
+  }
+
+  private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: RequestOrResponse, replicaId: Int) {
+    val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
+    debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
+    val partitionsInError = if(stopReplicaResponse.errorCode != ErrorMapping.NoError) {
+      stopReplicaResponse.responseMap.keySet
+    } else
+      stopReplicaResponse.responseMap.filter(p => p._2 != ErrorMapping.NoError).map(_._1).toSet
+    val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
+    inLock(controllerContext.controllerLock) {
+      // move all the failed replicas to ReplicaDeletionFailed
+      failReplicaDeletion(replicasInError)
+      if(replicasInError.size != stopReplicaResponse.responseMap.size) {
+        // some replicas could have been successfully deleted
+        val deletedReplicas = stopReplicaResponse.responseMap.keySet -- partitionsInError
+        completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
+      }
+    }
+  }
+
+  class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") {
+    val zkClient = controllerContext.zkClient
+    override def doWork() {
+      inLock(controllerContext.controllerLock) {
+        awaitTopicDeletionNotification()
+        val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
+        if(topicsQueuedForDeletion.size > 0)
+          info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
+        topicsQueuedForDeletion.foreach { topic =>
+          // if all replicas are marked as deleted successfully, then topic deletion is done
+          if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
+            // clear up all state for this topic from controller cache and zookeeper
+            completeDeleteTopic(topic)
+            info("Deletion of topic %s successfully completed".format(topic))
+          } else {
+            if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
+              // ignore since topic deletion is in progress
+              val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
+              val replicaIds = replicasInDeletionStartedState.map(_.replica)
+              val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))
+              info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
+                partitions.mkString(","), topic))
+            } else {
+              // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
+              // TopicDeletionSuccessful. That means, there is at least one failed replica, which means topic deletion
+              // should be retried
+              val replicasInTopicDeletionFailedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionFailed)
+              // mark topic for deletion retry
+              markTopicForDeletionRetry(topic)
+              info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
+                .format(topic, replicasInTopicDeletionFailedState.mkString(",")))
+            }
+          }
+          // Try delete topic if it is eligible for deletion.
+          if(isTopicEligibleForDeletion(topic)) {
+            info("Deletion of topic %s (re)started".format(topic))
+            // topic deletion will be kicked off
+            onTopicDeletion(Set(topic))
+          } else if(isTopicDeletionHalted(topic)) {
+            info("Not retrying deletion of topic %s at this time since it is halted".format(topic))
+          }
+        }
+      }
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/network/BlockingChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala b/core/src/main/scala/kafka/network/BlockingChannel.scala
index d22dabd..ab04b3f 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -76,7 +76,7 @@ class BlockingChannel( val host: String,
       // but let's do it to be sure.
       swallow(channel.close())
       swallow(channel.socket.close())
-      swallow(readChannel.close())
+      if(readChannel != null) swallow(readChannel.close())
       channel = null; readChannel = null; writeChannel = null
       connected = false
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bd7940b..c56ad50 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -23,9 +23,7 @@ import kafka.message._
 import kafka.network._
 import kafka.log._
 import kafka.utils.ZKGroupTopicDirs
-import org.apache.log4j.Logger
 import scala.collection._
-import java.util.Properties
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic._
 import kafka.metrics.KafkaMetricsGroup
@@ -54,7 +52,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   private val delayedRequestMetrics = new DelayedRequestMetrics
   /* following 3 data structures are updated by the update metadata request
   * and is queried by the topic metadata request. */
-  var leaderCache: mutable.Map[TopicAndPartition, PartitionStateInfo] =
+  var metadataCache: mutable.Map[TopicAndPartition, PartitionStateInfo] =
     new mutable.HashMap[TopicAndPartition, PartitionStateInfo]()
   private val aliveBrokers: mutable.Map[Int, Broker] = new mutable.HashMap[Int, Broker]()
   private val partitionMetadataLock = new Object
@@ -87,7 +85,16 @@ class KafkaApis(val requestChannel: RequestChannel,
       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
   }
 
+  // ensureTopicExists is only for client facing requests
+  private def ensureTopicExists(topic: String) = {
+    if(!metadataCache.exists { case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic.equals(topic)} )
+      throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted")
+  }
+
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
+    // ensureTopicExists is only for client facing requests
+    // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
+    // stop serving data to clients for the topic being deleted
     val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
     try {
       val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest)
@@ -101,6 +108,9 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleStopReplicaRequest(request: RequestChannel.Request) {
+    // ensureTopicExists is only for client facing requests
+    // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
+    // stop serving data to clients for the topic being deleted
     val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
     val (response, error) = replicaManager.stopReplicas(stopReplicaRequest)
     val stopReplicaResponse = new StopReplicaResponse(stopReplicaRequest.correlationId, response.toMap, error)
@@ -110,6 +120,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleUpdateMetadataRequest(request: RequestChannel.Request) {
     val updateMetadataRequest = request.requestObj.asInstanceOf[UpdateMetadataRequest]
+    // ensureTopicExists is only for client facing requests
+    // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
+    // stop serving data to clients for the topic being deleted
     val stateChangeLogger = replicaManager.stateChangeLogger
     if(updateMetadataRequest.controllerEpoch < replicaManager.controllerEpoch) {
       val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
@@ -124,18 +137,38 @@ class KafkaApis(val requestChannel: RequestChannel,
       // cache the list of alive brokers in the cluster
       updateMetadataRequest.aliveBrokers.foreach(b => aliveBrokers.put(b.id, b))
       updateMetadataRequest.partitionStateInfos.foreach { partitionState =>
-        leaderCache.put(partitionState._1, partitionState._2)
+        metadataCache.put(partitionState._1, partitionState._2)
         if(stateChangeLogger.isTraceEnabled)
           stateChangeLogger.trace(("Broker %d cached leader info %s for partition %s in response to UpdateMetadata request " +
             "sent by controller %d epoch %d with correlation id %d").format(brokerId, partitionState._2, partitionState._1,
             updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
       }
+      // remove the topics that don't exist in the UpdateMetadata request since those are the topics that are
+      // currently being deleted by the controller
+      val topicsKnownToThisBroker = metadataCache.map{
+        case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet
+      val topicsKnownToTheController = updateMetadataRequest.partitionStateInfos.map {
+        case(topicAndPartition, partitionStateInfo) => topicAndPartition.topic }.toSet
+      val deletedTopics = topicsKnownToThisBroker -- topicsKnownToTheController
+      val partitionsToBeDeleted = metadataCache.filter {
+        case(topicAndPartition, partitionStateInfo) => deletedTopics.contains(topicAndPartition.topic)
+      }.keySet
+      partitionsToBeDeleted.foreach { partition =>
+        metadataCache.remove(partition)
+        if(stateChangeLogger.isTraceEnabled)
+          stateChangeLogger.trace(("Broker %d deleted partition %s from metadata cache in response to UpdateMetadata request " +
+            "sent by controller %d epoch %d with correlation id %d").format(brokerId, partition,
+            updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, updateMetadataRequest.correlationId))
+      }
     }
     val updateMetadataResponse = new UpdateMetadataResponse(updateMetadataRequest.correlationId)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(updateMetadataResponse)))
   }
 
   def handleControlledShutdownRequest(request: RequestChannel.Request) {
+    // ensureTopicExists is only for client facing requests
+    // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
+    // stop serving data to clients for the topic being deleted
     val controlledShutdownRequest = request.requestObj.asInstanceOf[ControlledShutdownRequest]
     val partitionsRemaining = controller.shutdownBroker(controlledShutdownRequest.brokerId)
     val controlledShutdownResponse = new ControlledShutdownResponse(controlledShutdownRequest.correlationId,
@@ -245,6 +278,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       BrokerTopicStats.getBrokerAllTopicsStats.bytesInRate.mark(messages.sizeInBytes)
 
       try {
+        ensureTopicExists(topicAndPartition.topic)
         val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
         val info =
           partitionOpt match {
@@ -347,6 +381,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
         val partitionData =
           try {
+            ensureTopicExists(topic)
             val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
             BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
             BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(messages.sizeInBytes)
@@ -417,6 +452,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val responseMap = offsetRequest.requestInfo.map(elem => {
       val (topicAndPartition, partitionOffsetRequestInfo) = elem
       try {
+        ensureTopicExists(topicAndPartition.topic)
         // ensure leader exists
         val localReplica = if(!offsetRequest.isFromDebuggingClient)
           replicaManager.getLeaderReplicaIfLocal(topicAndPartition.topic, topicAndPartition.partition)
@@ -524,18 +560,18 @@ class KafkaApis(val requestChannel: RequestChannel,
         metadataRequest.topics.toSet
       else {
         partitionMetadataLock synchronized {
-          leaderCache.keySet.map(_.topic)
+          metadataCache.keySet.map(_.topic)
         }
       }
     }
     val topicMetadataList =
       partitionMetadataLock synchronized {
         uniqueTopics.map { topic =>
-          if(leaderCache.keySet.map(_.topic).contains(topic)) {
-            val partitionStateInfo = leaderCache.filter(p => p._1.topic.equals(topic))
+          if(metadataCache.keySet.map(_.topic).contains(topic)) {
+            val partitionStateInfo = metadataCache.filter(p => p._1.topic.equals(topic))
             val sortedPartitions = partitionStateInfo.toList.sortWith((m1,m2) => m1._1.partition < m2._1.partition)
             val partitionMetadata = sortedPartitions.map { case(topicAndPartition, partitionState) =>
-              val replicas = leaderCache(topicAndPartition).allReplicas
+              val replicas = metadataCache(topicAndPartition).allReplicas
               var replicaInfo: Seq[Broker] = replicas.map(aliveBrokers.getOrElse(_, null)).filter(_ != null).toSeq
               var leaderInfo: Option[Broker] = None
               var isrInfo: Seq[Broker] = Nil
@@ -607,6 +643,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       case (topicAndPartition, metaAndError) => {
         val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
         try {
+          ensureTopicExists(topicAndPartition.topic)
           if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
             (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
           } else {
@@ -632,6 +669,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val responseInfo = offsetFetchRequest.requestInfo.map( t => {
       val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
       try {
+        ensureTopicExists(t.topic)
         val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1
         payloadOpt match {
           case Some(payload) => {

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
index 9dca55c..8c69d09 100644
--- a/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
+++ b/core/src/main/scala/kafka/server/KafkaHealthcheck.scala
@@ -20,7 +20,6 @@ package kafka.server
 import kafka.utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
-import kafka.common._
 import java.net.InetAddress
 
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
index b5719f8..19f61a9 100644
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
@@ -19,7 +19,6 @@ package kafka.server
 import scala.collection._
 import kafka.utils.Logging
 import kafka.common._
-import java.util.concurrent.locks.ReentrantLock
 import java.io._
 
 /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index f9d10d3..21bba48 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -114,7 +114,8 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {
-    stateChangeLogger.trace("Broker %d handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId))
+    stateChangeLogger.trace("Broker %d handling stop replica (delete=%s) for partition [%s,%d]".format(localBrokerId,
+      deletePartition.toString, topic, partitionId))
     val errorCode = ErrorMapping.NoError
     getPartition(topic, partitionId) match {
       case Some(partition) =>
@@ -126,15 +127,18 @@ class ReplicaManager(val config: KafkaConfig,
           if (removedPartition != null)
             removedPartition.delete() // this will delete the local log
         }
-      case None => //do nothing if replica no longer exists
+      case None => //do nothing if replica no longer exists. This can happen during delete topic retries
+        stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker"
+          .format(localBrokerId, deletePartition, topic, partitionId))
     }
-    stateChangeLogger.trace("Broker %d finished handling stop replica for partition [%s,%d]".format(localBrokerId, topic, partitionId))
+    stateChangeLogger.trace("Broker %d finished handling stop replica (delete=%s) for partition [%s,%d]"
+      .format(localBrokerId, deletePartition, topic, partitionId))
     errorCode
   }
 
-  def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[(String, Int), Short], Short) = {
+  def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicAndPartition, Short], Short) = {
     replicaStateChangeLock synchronized {
-      val responseMap = new collection.mutable.HashMap[(String, Int), Short]
+      val responseMap = new collection.mutable.HashMap[TopicAndPartition, Short]
       if(stopReplicaRequest.controllerEpoch < controllerEpoch) {
         stateChangeLogger.warn("Broker %d received stop replica request from an old controller epoch %d."
           .format(localBrokerId, stopReplicaRequest.controllerEpoch) +
@@ -142,14 +146,11 @@ class ReplicaManager(val config: KafkaConfig,
         (responseMap, ErrorMapping.StaleControllerEpochCode)
       } else {
         controllerEpoch = stopReplicaRequest.controllerEpoch
-        val responseMap = new HashMap[(String, Int), Short]
         // First stop fetchers for all partitions, then stop the corresponding replicas
-        replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map {
-          case (topic, partition) => TopicAndPartition(topic, partition)
-        })
-        for((topic, partitionId) <- stopReplicaRequest.partitions){
-          val errorCode = stopReplica(topic, partitionId, stopReplicaRequest.deletePartitions)
-          responseMap.put((topic, partitionId), errorCode)
+        replicaFetcherManager.removeFetcherForPartitions(stopReplicaRequest.partitions.map(r => TopicAndPartition(r.topic, r.partition)))
+        for(topicAndPartition <- stopReplicaRequest.partitions){
+          val errorCode = stopReplica(topicAndPartition.topic, topicAndPartition.partition, stopReplicaRequest.deletePartitions)
+          responseMap.put(topicAndPartition, errorCode)
         }
         (responseMap, ErrorMapping.NoError)
       }
@@ -252,10 +253,10 @@ class ReplicaManager(val config: KafkaConfig,
 
         val partitionsTobeLeader = partitionState
           .filter{ case (partition, partitionStateInfo) => partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader == config.brokerId}
-        val partitionsTobeFollower = (partitionState -- partitionsTobeLeader.keys)
+        val partitionsToBeFollower = (partitionState -- partitionsTobeLeader.keys)
 
         if (!partitionsTobeLeader.isEmpty) makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, leaderAndISRRequest.correlationId, responseMap)
-        if (!partitionsTobeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsTobeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
+        if (!partitionsToBeFollower.isEmpty) makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId, responseMap)
 
         // we initialize highwatermark thread after the first leaderisrrequest. This ensures that all the partitions
         // have been completely populated before starting the checkpointing there by avoiding weird race conditions

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/server/TopicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/TopicConfigManager.scala b/core/src/main/scala/kafka/server/TopicConfigManager.scala
index 42e98dd..d41fd33 100644
--- a/core/src/main/scala/kafka/server/TopicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/TopicConfigManager.scala
@@ -105,8 +105,13 @@ class TopicConfigManager(private val zkClient: ZkClient,
               log.config = logConfig
             lastExecutedChange = changeId
             info("Processed topic config change %d for topic %s, setting new config to %s.".format(changeId, topic, props))
-          } else if (now - stat.getCtime > changeExpirationMs) {
-            /* this change is now obsolete, try to delete it unless it is the last change left */
+          } else {
+            if (now - stat.getCtime > changeExpirationMs) {
+              /* this change is now obsolete, try to delete it unless it is the last change left */
+              error("Ignoring topic config change %d for topic %s since the change has expired")
+            } else {
+              error("Ignoring topic config change %d for topic %s since the topic may have been deleted")
+            }
             ZkUtils.deletePath(zkClient, changeZnode)
           }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
index b189619..e5b6ff1 100644
--- a/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
+++ b/core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
@@ -17,12 +17,12 @@
 package kafka.server
 
 import kafka.utils.ZkUtils._
-import kafka.utils.{Json, Utils, SystemTime, Logging}
+import kafka.utils.Utils._
+import kafka.utils.{Json, SystemTime, Logging}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.IZkDataListener
 import kafka.controller.ControllerContext
 import kafka.controller.KafkaController
-import kafka.common.KafkaException
 
 /**
  * This class handles zookeeper based leader election based on an ephemeral path. The election module does not handle
@@ -44,7 +44,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
   val leaderChangeListener = new LeaderChangeListener
 
   def startup {
-    controllerContext.controllerLock synchronized {
+    inLock(controllerContext.controllerLock) {
       controllerContext.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
       elect
     }
@@ -102,7 +102,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
      */
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
-      controllerContext.controllerLock synchronized {
+      inLock(controllerContext.controllerLock) {
         leaderId = KafkaController.parseControllerId(data.toString)
         info("New leader is %d".format(leaderId))
       }
@@ -115,7 +115,7 @@ class ZookeeperLeaderElector(controllerContext: ControllerContext,
      */
     @throws(classOf[Exception])
     def handleDataDeleted(dataPath: String) {
-      controllerContext.controllerLock synchronized {
+      inLock(controllerContext.controllerLock) {
         debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader"
           .format(brokerId, dataPath))
         if(amILeader)

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index b42e52b..fa86bb9 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -47,20 +47,24 @@ object ZkUtils extends Logging {
   val ControllerPath = "/controller"
   val ControllerEpochPath = "/controller_epoch"
   val ReassignPartitionsPath = "/admin/reassign_partitions"
+  val DeleteTopicsPath = "/admin/delete_topics"
   val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
 
-  def getTopicPath(topic: String): String ={
+  def getTopicPath(topic: String): String = {
     BrokerTopicsPath + "/" + topic
   }
 
-  def getTopicPartitionsPath(topic: String): String ={
+  def getTopicPartitionsPath(topic: String): String = {
     getTopicPath(topic) + "/partitions"
   }
 
   def getTopicConfigPath(topic: String): String = 
     TopicConfigPath + "/" + topic
-  
-  def getController(zkClient: ZkClient): Int= {
+
+  def getDeleteTopicPath(topic: String): String =
+    DeleteTopicsPath + "/" + topic
+
+  def getController(zkClient: ZkClient): Int = {
     readDataMaybeNull(zkClient, ControllerPath)._1 match {
       case Some(controller) => KafkaController.parseControllerId(controller)
       case None => throw new KafkaException("Controller doesn't exist")

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 59de1b4..d5644ea 100644
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -27,6 +27,7 @@ import kafka.utils.{Logging, ZkUtils, TestUtils}
 import kafka.common.{TopicExistsException, TopicAndPartition}
 import kafka.server.{KafkaServer, KafkaConfig}
 import java.io.File
+import TestUtils._
 
 
 class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
@@ -161,9 +162,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     }, 1000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     // in sync replicas should not have any replica that is not in the new assigned replicas
-    checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
+    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
-    ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
     assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
     servers.foreach(_.shutdown())
   }
@@ -190,8 +191,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     }, 1000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 0, 2, 3", newReplicas, assignedReplicas)
-    checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
-    ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
     assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
     servers.foreach(_.shutdown())
   }
@@ -218,8 +219,8 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     }, 2000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 2, 3", newReplicas, assignedReplicas)
-    checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
-    ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
+    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
     assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
     servers.foreach(_.shutdown())
   }
@@ -255,12 +256,12 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     reassignPartitionsCommand.reassignPartitions
     // create brokers
     val servers = TestUtils.createBrokerConfigs(2).map(b => TestUtils.createServer(new KafkaConfig(b)))
-    TestUtils.waitUntilTrue(checkIfReassignPartitionPathExists, 1000)
+    TestUtils.waitUntilTrue(() => checkIfReassignPartitionPathExists(zkClient), 1000)
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionToBeReassigned)
     assertEquals("Partition should have been reassigned to 0, 1", newReplicas, assignedReplicas)
-    checkForPhantomInSyncReplicas(topic, partitionToBeReassigned, assignedReplicas)
+    checkForPhantomInSyncReplicas(zkClient, topic, partitionToBeReassigned, assignedReplicas)
     // ensure that there are no under replicated partitions
-    ensureNoUnderReplicatedPartitions(topic, partitionToBeReassigned, assignedReplicas, servers)
+    ensureNoUnderReplicatedPartitions(zkClient, topic, partitionToBeReassigned, assignedReplicas, servers)
     assertTrue(TestUtils.waitUntilTrue(() => getBrokersWithPartitionDir(servers, topic, 0) == newReplicas.toSet, 5000))
     servers.foreach(_.shutdown())
   }
@@ -319,9 +320,9 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     try {
       // wait for the update metadata request to trickle to the brokers
       assertTrue("Topic test not created after timeout", TestUtils.waitUntilTrue(() =>
-        activeServers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000))
+        activeServers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.isr.size != 3), 1000))
       assertEquals(0, partitionsRemaining.size)
-      var partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
+      var partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic, partition))
       var leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
       assertEquals(0, leaderAfterShutdown)
       assertEquals(2, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.isr.size)
@@ -330,15 +331,15 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
       partitionsRemaining = controller.shutdownBroker(1)
       assertEquals(0, partitionsRemaining.size)
       activeServers = servers.filter(s => s.config.brokerId == 0)
-      partitionStateInfo = activeServers.head.apis.leaderCache(TopicAndPartition(topic, partition))
+      partitionStateInfo = activeServers.head.apis.metadataCache(TopicAndPartition(topic, partition))
       leaderAfterShutdown = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
       assertEquals(0, leaderAfterShutdown)
 
-      assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
       partitionsRemaining = controller.shutdownBroker(0)
       assertEquals(1, partitionsRemaining.size)
       // leader doesn't change since all the replicas are shut down
-      assertTrue(servers.foldLeft(true)(_ && _.apis.leaderCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
+      assertTrue(servers.foldLeft(true)(_ && _.apis.metadataCache(TopicAndPartition(topic, partition)).leaderIsrAndControllerEpoch.leaderAndIsr.leader == 0))
     }
     finally {
       servers.foreach(_.shutdown())
@@ -389,27 +390,4 @@ class AdminTest extends JUnit3Suite with ZooKeeperTestHarness with Logging {
     }
   }
 
-  private def checkForPhantomInSyncReplicas(topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int]) {
-    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned)
-    // in sync replicas should not have any replica that is not in the new assigned replicas
-    val phantomInSyncReplicas = inSyncReplicas.toSet -- assignedReplicas.toSet
-    assertTrue("All in sync replicas %s must be in the assigned replica list %s".format(inSyncReplicas, assignedReplicas),
-      phantomInSyncReplicas.size == 0)
-  }
-
-  private def ensureNoUnderReplicatedPartitions(topic: String, partitionToBeReassigned: Int, assignedReplicas: Seq[Int],
-                                                servers: Seq[KafkaServer]) {
-    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitionToBeReassigned)
-    assertFalse("Reassigned partition [%s,%d] is underreplicated".format(topic, partitionToBeReassigned),
-                inSyncReplicas.size < assignedReplicas.size)
-    val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitionToBeReassigned)
-    assertTrue("Reassigned partition [%s,%d] is unavailable".format(topic, partitionToBeReassigned), leader.isDefined)
-    val leaderBroker = servers.filter(s => s.config.brokerId == leader.get).head
-    assertTrue("Reassigned partition [%s,%d] is underreplicated as reported by the leader %d".format(topic, partitionToBeReassigned, leader.get),
-      leaderBroker.replicaManager.underReplicatedPartitionCount() == 0)
-  }
-
-  private def checkIfReassignPartitionPathExists(): Boolean = {
-    ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
-  }
 }