You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/02/07 05:18:39 UTC

[3/3] git commit: KAFKA-330 Delete topic; reviewed by Jun Rao, Guozhang Wang and Joel Koshy

KAFKA-330 Delete topic; reviewed by Jun Rao, Guozhang Wang and Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/167acb83
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/167acb83
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/167acb83

Branch: refs/heads/trunk
Commit: 167acb832d7f104eb2c344dcfd7b914c763d881d
Parents: fa6339c
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Feb 6 20:18:10 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Feb 6 20:18:24 2014 -0800

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |   3 +-
 .../main/scala/kafka/admin/TopicCommand.scala   |   3 +-
 .../kafka/api/ControlledShutdownResponse.scala  |   4 +-
 .../scala/kafka/api/LeaderAndIsrRequest.scala   |   1 +
 .../scala/kafka/api/StopReplicaRequest.scala    |  22 +-
 .../scala/kafka/api/StopReplicaResponse.scala   |  18 +-
 .../scala/kafka/api/UpdateMetadataRequest.scala |   1 +
 .../controller/ControllerChannelManager.scala   | 127 ++++---
 .../kafka/controller/KafkaController.scala      | 319 ++++++++++------
 .../controller/PartitionLeaderSelector.scala    |  16 +-
 .../controller/PartitionStateMachine.scala      | 106 +++++-
 .../kafka/controller/ReplicaStateMachine.scala  | 165 +++++---
 .../kafka/controller/TopicDeletionManager.scala | 373 ++++++++++++++++++
 .../scala/kafka/network/BlockingChannel.scala   |   2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  54 ++-
 .../scala/kafka/server/KafkaHealthcheck.scala   |   1 -
 .../scala/kafka/server/OffsetCheckpoint.scala   |   1 -
 .../scala/kafka/server/ReplicaManager.scala     |  29 +-
 .../scala/kafka/server/TopicConfigManager.scala |   9 +-
 .../kafka/server/ZookeeperLeaderElector.scala   |  10 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  12 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala |  52 +--
 .../unit/kafka/admin/DeleteTopicTest.scala      | 377 +++++++++++++++++++
 .../api/RequestResponseSerializationTest.scala  |   6 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   4 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |  32 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |  11 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala |  27 +-
 28 files changed, 1444 insertions(+), 341 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index a167756..36ddeb4 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -140,8 +140,7 @@ object AdminUtils extends Logging {
   }
   
   def deleteTopic(zkClient: ZkClient, topic: String) {
-    zkClient.deleteRecursive(ZkUtils.getTopicPath(topic))
-    zkClient.deleteRecursive(ZkUtils.getTopicConfigPath(topic))
+    ZkUtils.createPersistentPath(zkClient, ZkUtils.getDeleteTopicPath(topic))
   }
   
   def topicExists(zkClient: ZkClient, topic: String): Boolean = 

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 842c110..65510eb 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -118,7 +118,7 @@ object TopicCommand {
     val topics = getTopics(zkClient, opts)
     topics.foreach { topic =>
       AdminUtils.deleteTopic(zkClient, topic)
-      println("Topic \"%s\" deleted.".format(topic))
+      println("Topic \"%s\" queued for deletion.".format(topic))
     }
   }
   
@@ -257,7 +257,6 @@ object TopicCommand {
     val topicsWithOverridesOpt = parser.accepts("topics-with-overrides",
                                                 "if set when listing topics, only show topics that have overridden configs")
 
-
     val options = parser.parse(args : _*)
   }
   

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
index a80aa49..46ec3db 100644
--- a/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
+++ b/core/src/main/scala/kafka/api/ControlledShutdownResponse.scala
@@ -18,11 +18,9 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-import collection.mutable.HashMap
-import collection.immutable.Map
 import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.api.ApiUtils._
-
+import collection.Set
 
 object ControlledShutdownResponse {
   def readFrom(buffer: ByteBuffer): ControlledShutdownResponse = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index a984878..0311737 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -26,6 +26,7 @@ import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.common.ErrorMapping
 import kafka.network.RequestChannel.Response
+import collection.Set
 
 
 object LeaderAndIsr {

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/api/StopReplicaRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaRequest.scala b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
index 820f0f5..68fc138 100644
--- a/core/src/main/scala/kafka/api/StopReplicaRequest.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaRequest.scala
@@ -17,13 +17,13 @@
 
 package kafka.api
 
-
 import java.nio._
 import kafka.api.ApiUtils._
 import kafka.network.{BoundedByteBufferSend, RequestChannel, InvalidRequestException}
-import kafka.common.ErrorMapping
+import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.network.RequestChannel.Response
 import kafka.utils.Logging
+import collection.Set
 
 
 object StopReplicaRequest extends Logging {
@@ -44,9 +44,9 @@ object StopReplicaRequest extends Logging {
         throw new InvalidRequestException("Invalid byte %d in delete partitions field. (Assuming false.)".format(x))
     }
     val topicPartitionPairCount = buffer.getInt
-    val topicPartitionPairSet = new collection.mutable.HashSet[(String, Int)]()
+    val topicPartitionPairSet = new collection.mutable.HashSet[TopicAndPartition]()
     (1 to topicPartitionPairCount) foreach { _ =>
-      topicPartitionPairSet.add(readShortString(buffer), buffer.getInt)
+      topicPartitionPairSet.add(TopicAndPartition(readShortString(buffer), buffer.getInt))
     }
     StopReplicaRequest(versionId, correlationId, clientId, controllerId, controllerEpoch,
                        deletePartitions, topicPartitionPairSet.toSet)
@@ -59,10 +59,10 @@ case class StopReplicaRequest(versionId: Short,
                               controllerId: Int,
                               controllerEpoch: Int,
                               deletePartitions: Boolean,
-                              partitions: Set[(String, Int)])
+                              partitions: Set[TopicAndPartition])
         extends RequestOrResponse(Some(RequestKeys.StopReplicaKey), correlationId) {
 
-  def this(deletePartitions: Boolean, partitions: Set[(String, Int)], controllerId: Int, controllerEpoch: Int, correlationId: Int) = {
+  def this(deletePartitions: Boolean, partitions: Set[TopicAndPartition], controllerId: Int, controllerEpoch: Int, correlationId: Int) = {
     this(StopReplicaRequest.CurrentVersion, correlationId, StopReplicaRequest.DefaultClientId,
          controllerId, controllerEpoch, deletePartitions, partitions)
   }
@@ -75,9 +75,9 @@ case class StopReplicaRequest(versionId: Short,
     buffer.putInt(controllerEpoch)
     buffer.put(if (deletePartitions) 1.toByte else 0.toByte)
     buffer.putInt(partitions.size)
-    for ((topic, partitionId) <- partitions){
-      writeShortString(buffer, topic)
-      buffer.putInt(partitionId)
+    for (topicAndPartition <- partitions) {
+      writeShortString(buffer, topicAndPartition.topic)
+      buffer.putInt(topicAndPartition.partition)
     }
   }
 
@@ -90,8 +90,8 @@ case class StopReplicaRequest(versionId: Short,
       4 + /* controller epoch */
       1 + /* deletePartitions */
       4 /* partition count */
-    for ((topic, partitionId) <- partitions){
-      size += (ApiUtils.shortStringLength(topic)) +
+    for (topicAndPartition <- partitions){
+      size += (ApiUtils.shortStringLength(topicAndPartition.topic)) +
               4 /* partition id */
     }
     size

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/api/StopReplicaResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/StopReplicaResponse.scala b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
index d7e3630..c90ddee 100644
--- a/core/src/main/scala/kafka/api/StopReplicaResponse.scala
+++ b/core/src/main/scala/kafka/api/StopReplicaResponse.scala
@@ -20,7 +20,7 @@ package kafka.api
 import java.nio.ByteBuffer
 import collection.mutable.HashMap
 import collection.immutable.Map
-import kafka.common.ErrorMapping
+import kafka.common.{TopicAndPartition, ErrorMapping}
 import kafka.api.ApiUtils._
 
 
@@ -30,12 +30,12 @@ object StopReplicaResponse {
     val errorCode = buffer.getShort
     val numEntries = buffer.getInt
 
-    val responseMap = new HashMap[(String, Int), Short]()
+    val responseMap = new HashMap[TopicAndPartition, Short]()
     for (i<- 0 until numEntries){
       val topic = readShortString(buffer)
       val partition = buffer.getInt
       val partitionErrorCode = buffer.getShort()
-      responseMap.put((topic, partition), partitionErrorCode)
+      responseMap.put(TopicAndPartition(topic, partition), partitionErrorCode)
     }
     new StopReplicaResponse(correlationId, responseMap.toMap, errorCode)
   }
@@ -43,7 +43,7 @@ object StopReplicaResponse {
 
 
 case class StopReplicaResponse(override val correlationId: Int,
-                               val responseMap: Map[(String, Int), Short],
+                               val responseMap: Map[TopicAndPartition, Short],
                                val errorCode: Short = ErrorMapping.NoError)
     extends RequestOrResponse(correlationId = correlationId) {
   def sizeInBytes(): Int ={
@@ -53,7 +53,7 @@ case class StopReplicaResponse(override val correlationId: Int,
       4 /* number of responses */
     for ((key, value) <- responseMap) {
       size +=
-        2 + key._1.length /* topic */ +
+        2 + key.topic.length /* topic */ +
         4 /* partition */ +
         2 /* error code for this partition */
     }
@@ -64,10 +64,10 @@ case class StopReplicaResponse(override val correlationId: Int,
     buffer.putInt(correlationId)
     buffer.putShort(errorCode)
     buffer.putInt(responseMap.size)
-    for ((key:(String, Int), value) <- responseMap){
-      writeShortString(buffer, key._1)
-      buffer.putInt(key._2)
-      buffer.putShort(value)
+    for ((topicAndPartition, errorCode) <- responseMap){
+      writeShortString(buffer, topicAndPartition.topic)
+      buffer.putInt(topicAndPartition.partition)
+      buffer.putShort(errorCode)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
index 54dd7bd..543e262 100644
--- a/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
+++ b/core/src/main/scala/kafka/api/UpdateMetadataRequest.scala
@@ -22,6 +22,7 @@ import kafka.cluster.Broker
 import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.network.RequestChannel.Response
+import collection.Set
 
 object UpdateMetadataRequest {
   val CurrentVersion = 0.shortValue

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index ea8485b..a1ee5a7 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -25,7 +25,10 @@ import kafka.server.KafkaConfig
 import collection.mutable
 import kafka.api._
 import org.apache.log4j.Logger
+import scala.Some
 import kafka.common.TopicAndPartition
+import kafka.api.RequestOrResponse
+import collection.Set
 
 class ControllerChannelManager (private val controllerContext: ControllerContext, config: KafkaConfig) extends Logging {
   private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
@@ -118,10 +121,8 @@ class RequestSendThread(val controllerId: Int,
     val queueItem = queue.take()
     val request = queueItem._1
     val callback = queueItem._2
-
     var receive: Receive = null
-
-    try{
+    try {
       lock synchronized {
         var isSendSuccessful = false
         while(isRunning.get() && !isSendSuccessful) {
@@ -155,7 +156,7 @@ class RequestSendThread(val controllerId: Int,
         stateChangeLogger.trace("Controller %d epoch %d received response correlationId %d for a request sent to broker %s"
                                   .format(controllerId, controllerContext.epoch, response.correlationId, toBroker.toString()))
 
-        if(callback != null){
+        if(callback != null) {
           callback(response)
         }
       }
@@ -180,12 +181,12 @@ class RequestSendThread(val controllerId: Int,
   }
 }
 
-class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendRequest: (Int, RequestOrResponse, (RequestOrResponse) => Unit) => Unit,
-                                   controllerId: Int, clientId: String)
-  extends  Logging {
+class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging {
+  val controllerContext = controller.controllerContext
+  val controllerId: Int = controller.config.brokerId
+  val clientId: String = controller.clientId
   val leaderAndIsrRequestMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), PartitionStateInfo]]
-  val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
-  val stopAndDeleteReplicaRequestMap = new mutable.HashMap[Int, Seq[(String, Int)]]
+  val stopReplicaRequestMap = new mutable.HashMap[Int, Seq[StopReplicaRequestInfo]]
   val updateMetadataRequestMap = new mutable.HashMap[Int, mutable.HashMap[TopicAndPartition, PartitionStateInfo]]
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
 
@@ -200,52 +201,47 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
     if(updateMetadataRequestMap.size > 0)
       throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
         "new one. Some UpdateMetadata state changes %s might be lost ".format(updateMetadataRequestMap.toString()))
-    if(stopAndDeleteReplicaRequestMap.size > 0)
-      throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
-        "new one. Some StopReplica with delete state changes %s might be lost ".format(stopAndDeleteReplicaRequestMap.toString()))
   }
 
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
-                                       replicas: Seq[Int]) {
-    brokerIds.foreach { brokerId =>
+                                       replicas: Seq[Int], callback: (RequestOrResponse) => Unit = null) {
+    brokerIds.filter(b => b >= 0).foreach { brokerId =>
       leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
       leaderAndIsrRequestMap(brokerId).put((topic, partition),
         PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
     }
-    addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(TopicAndPartition(topic, partition)))
+    addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
   }
 
-  def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean) {
-    brokerIds.foreach { brokerId =>
-      stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)])
-      stopAndDeleteReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[(String, Int)])
-      if (deletePartition) {
-        val v = stopAndDeleteReplicaRequestMap(brokerId)
-        stopAndDeleteReplicaRequestMap(brokerId) = v :+ (topic, partition)
-      }
-      else {
-        val v = stopReplicaRequestMap(brokerId)
-        stopReplicaRequestMap(brokerId) = v :+ (topic, partition)
-      }
+  def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,
+                                      callback: (RequestOrResponse, Int) => Unit = null) {
+    brokerIds.filter(b => b >= 0).foreach { brokerId =>
+      stopReplicaRequestMap.getOrElseUpdate(brokerId, Seq.empty[StopReplicaRequestInfo])
+      val v = stopReplicaRequestMap(brokerId)
+      if(callback != null)
+        stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
+          deletePartition, (r: RequestOrResponse) => { callback(r, brokerId) })
+      else
+        stopReplicaRequestMap(brokerId) = v :+ StopReplicaRequestInfo(PartitionAndReplica(topic, partition, brokerId),
+          deletePartition)
     }
   }
 
+  /* Send UpdateMetadataRequest to the given brokers for all partitions except those being deleted as part of delete topic
+   *
+   */
   def addUpdateMetadataRequestForBrokers(brokerIds: Seq[Int],
-                                         partitions:scala.collection.Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
-    val partitionList =
-    if(partitions.isEmpty) {
-      controllerContext.partitionLeadershipInfo.keySet
-    } else {
-      partitions
-    }
+                                         callback: (RequestOrResponse) => Unit = null) {
+    val partitionList = controllerContext.partitionLeadershipInfo.keySet.dropWhile(
+      p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
     partitionList.foreach { partition =>
       val leaderIsrAndControllerEpochOpt = controllerContext.partitionLeadershipInfo.get(partition)
       leaderIsrAndControllerEpochOpt match {
         case Some(leaderIsrAndControllerEpoch) =>
           val replicas = controllerContext.partitionReplicaAssignment(partition).toSet
           val partitionStateInfo = PartitionStateInfo(leaderIsrAndControllerEpoch, replicas)
-          brokerIds.foreach { brokerId =>
+          brokerIds.filter(b => b >= 0).foreach { brokerId =>
             updateMetadataRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[TopicAndPartition, PartitionStateInfo])
             updateMetadataRequestMap(brokerId).put(partition, partitionStateInfo)
           }
@@ -269,7 +265,7 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
                                                                  p._2.leaderIsrAndControllerEpoch, correlationId, broker,
                                                                  p._1._1, p._1._2))
       }
-      sendRequest(broker, leaderAndIsrRequest, null)
+      controller.sendRequest(broker, leaderAndIsrRequest, null)
     }
     leaderAndIsrRequestMap.clear()
     updateMetadataRequestMap.foreach { m =>
@@ -280,24 +276,23 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
       partitionStateInfos.foreach(p => stateChangeLogger.trace(("Controller %d epoch %d sending UpdateMetadata request %s with " +
         "correlationId %d to broker %d for partition %s").format(controllerId, controllerEpoch, p._2.leaderIsrAndControllerEpoch,
                                                                  correlationId, broker, p._1)))
-      sendRequest(broker, updateMetadataRequest, null)
+      controller.sendRequest(broker, updateMetadataRequest, null)
     }
     updateMetadataRequestMap.clear()
-    Seq((stopReplicaRequestMap, false), (stopAndDeleteReplicaRequestMap, true)) foreach {
-      case(m, deletePartitions) => {
-        m foreach {
-          case(broker, replicas) =>
-            if (replicas.size > 0) {
-              debug("The stop replica request (delete = %s) sent to broker %d is %s"
-                .format(deletePartitions, broker, replicas.mkString(",")))
-              val stopReplicaRequest = new StopReplicaRequest(deletePartitions, Set.empty[(String, Int)] ++ replicas, controllerId,
-                                                              controllerEpoch, correlationId)
-              sendRequest(broker, stopReplicaRequest, null)
-            }
-        }
-        m.clear()
+    stopReplicaRequestMap foreach { case(broker, replicaInfoList) =>
+      val stopReplicaWithDelete = replicaInfoList.filter(p => p.deletePartition == true).map(i => i.replica).toSet
+      val stopReplicaWithoutDelete = replicaInfoList.filter(p => p.deletePartition == false).map(i => i.replica).toSet
+      debug("The stop replica request (delete = true) sent to broker %d is %s"
+        .format(broker, stopReplicaWithDelete.mkString(",")))
+      debug("The stop replica request (delete = false) sent to broker %d is %s"
+        .format(broker, stopReplicaWithoutDelete.mkString(",")))
+      replicaInfoList.foreach { r =>
+        val stopReplicaRequest = new StopReplicaRequest(r.deletePartition,
+          Set(TopicAndPartition(r.replica.topic, r.replica.partition)), controllerId, controllerEpoch, correlationId)
+        controller.sendRequest(broker, stopReplicaRequest, r.callback)
       }
     }
+    stopReplicaRequestMap.clear()
   }
 }
 
@@ -306,3 +301,35 @@ case class ControllerBrokerStateInfo(channel: BlockingChannel,
                                      messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
                                      requestSendThread: RequestSendThread)
 
+case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: (RequestOrResponse) => Unit = null)
+
+class Callbacks private (var leaderAndIsrResponseCallback:(RequestOrResponse) => Unit = null,
+                         var updateMetadataResponseCallback:(RequestOrResponse) => Unit = null,
+                         var stopReplicaResponseCallback:(RequestOrResponse, Int) => Unit = null)
+
+object Callbacks {
+  class CallbackBuilder {
+    var leaderAndIsrResponseCbk:(RequestOrResponse) => Unit = null
+    var updateMetadataResponseCbk:(RequestOrResponse) => Unit = null
+    var stopReplicaResponseCbk:(RequestOrResponse, Int) => Unit = null
+
+    def leaderAndIsrCallback(cbk: (RequestOrResponse) => Unit): CallbackBuilder = {
+      leaderAndIsrResponseCbk = cbk
+      this
+    }
+
+    def updateMetadataCallback(cbk: (RequestOrResponse) => Unit): CallbackBuilder = {
+      updateMetadataResponseCbk = cbk
+      this
+    }
+
+    def stopReplicaCallback(cbk: (RequestOrResponse, Int) => Unit): CallbackBuilder = {
+      stopReplicaResponseCbk = cbk
+      this
+    }
+
+    def build: Callbacks = {
+      new Callbacks(leaderAndIsrResponseCbk, updateMetadataResponseCbk, stopReplicaResponseCbk)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index a0267ae..d812cb4 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -17,42 +17,42 @@
 package kafka.controller
 
 import collection._
-import collection.immutable.Set
+import collection.Set
 import com.yammer.metrics.core.Gauge
 import java.lang.{IllegalStateException, Object}
 import java.util.concurrent.TimeUnit
-import kafka.admin.{AdminOperationException, PreferredReplicaLeaderElectionCommand}
+import kafka.admin.PreferredReplicaLeaderElectionCommand
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common._
 import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 import kafka.server.{ZookeeperLeaderElector, KafkaConfig}
 import kafka.utils.ZkUtils._
-import kafka.utils.{Json, Utils, ZkUtils, Logging, KafkaScheduler}
+import kafka.utils._
+import kafka.utils.Utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
 import java.util.concurrent.atomic.AtomicInteger
+import org.apache.log4j.Logger
 import scala.Some
 import kafka.common.TopicAndPartition
-import org.apache.log4j.Logger
+import java.util.concurrent.locks.ReentrantLock
 
 class ControllerContext(val zkClient: ZkClient,
-                        val zkSessionTimeout: Int,
-                        var controllerChannelManager: ControllerChannelManager = null,
-                        val controllerLock: Object = new Object,
-                        var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty,
-                        val brokerShutdownLock: Object = new Object,
-                        var epoch: Int = KafkaController.InitialControllerEpoch - 1,
-                        var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1,
-                        val correlationId: AtomicInteger = new AtomicInteger(0),
-                        var allTopics: Set[String] = Set.empty,
-                        var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty,
-                        var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty,
-                        var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] =
-                          new mutable.HashMap,
-                        var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] =
-                          new mutable.HashSet) {
+                        val zkSessionTimeout: Int) {
+  var controllerChannelManager: ControllerChannelManager = null
+  val controllerLock: ReentrantLock = new ReentrantLock()
+  var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
+  val brokerShutdownLock: Object = new Object
+  var epoch: Int = KafkaController.InitialControllerEpoch - 1
+  var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
+  val correlationId: AtomicInteger = new AtomicInteger(0)
+  var allTopics: Set[String] = Set.empty
+  var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
+  var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
+  var partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
+  var partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet
 
   private var liveBrokersUnderlying: Set[Broker] = Set.empty
   private var liveBrokerIdsUnderlying: Set[Int] = Set.empty
@@ -86,9 +86,37 @@ class ControllerContext(val zkClient: ZkClient,
     }.flatten.toSet
   }
 
+  def replicasForTopic(topic: String): Set[PartitionAndReplica] = {
+    partitionReplicaAssignment
+      .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }
+      .map { case(topicAndPartition, replicas) =>
+        replicas.map { r =>
+          new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, r)
+        }
+    }.flatten.toSet
+  }
+
+  def partitionsForTopic(topic: String): collection.Set[TopicAndPartition] = {
+    partitionReplicaAssignment
+      .filter { case(topicAndPartition, replicas) => topicAndPartition.topic.equals(topic) }.keySet
+  }
+
   def allLiveReplicas(): Set[PartitionAndReplica] = {
     replicasOnBrokers(liveBrokerIds)
   }
+
+  def replicasForPartition(partitions: collection.Set[TopicAndPartition]): collection.Set[PartitionAndReplica] = {
+    partitions.map { p =>
+      val replicas = partitionReplicaAssignment(p)
+      replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))
+    }.flatten
+  }
+
+  def removeTopic(topic: String) = {
+    partitionLeadershipInfo = partitionLeadershipInfo.dropWhile(p => p._1.topic.equals(topic))
+    partitionReplicaAssignment = partitionReplicaAssignment.dropWhile(p => p._1.topic.equals(topic))
+    allTopics -= topic
+  }
 }
 
 trait KafkaControllerMBean {
@@ -128,18 +156,19 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   private var isRunning = true
   private val stateChangeLogger = Logger.getLogger(KafkaController.stateChangeLogger)
   val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
-  private val partitionStateMachine = new PartitionStateMachine(this)
-  private val replicaStateMachine = new ReplicaStateMachine(this)
+  val partitionStateMachine = new PartitionStateMachine(this)
+  val replicaStateMachine = new ReplicaStateMachine(this)
   private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
     onControllerResignation, config.brokerId)
   // have a separate scheduler for the controller to be able to start and stop independently of the
   // kafka server
   private val autoRebalanceScheduler = new KafkaScheduler(1)
+  var deleteTopicManager: TopicDeletionManager = null
   val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext)
   private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
   private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
   private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
-  private val brokerRequestBatch = new ControllerBrokerRequestBatch(controllerContext, sendRequest, this.config.brokerId, this.clientId)
+  private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
   registerControllerChangedListener()
 
   newGauge(
@@ -153,7 +182,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     "OfflinePartitionsCount",
     new Gauge[Int] {
       def value(): Int = {
-        controllerContext.controllerLock synchronized {
+        inLock(controllerContext.controllerLock) {
           if (!isActive())
             0
           else
@@ -167,7 +196,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     "PreferredReplicaImbalanceCount",
     new Gauge[Int] {
       def value(): Int = {
-        controllerContext.controllerLock synchronized {
+        inLock(controllerContext.controllerLock) {
           if (!isActive())
             0
           else
@@ -200,7 +229,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     controllerContext.brokerShutdownLock synchronized {
       info("Shutting down broker " + id)
 
-      controllerContext.controllerLock synchronized {
+      inLock(controllerContext.controllerLock) {
         if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id))
           throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
 
@@ -211,7 +240,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       }
 
       val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
-        controllerContext.controllerLock synchronized {
+        inLock(controllerContext.controllerLock) {
           controllerContext.partitionsOnBroker(id)
             .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
         }
@@ -219,7 +248,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       allPartitionsAndReplicationFactorOnBroker.foreach {
         case(topicAndPartition, replicationFactor) =>
         // Move leadership serially to relinquish lock.
-        controllerContext.controllerLock synchronized {
+        inLock(controllerContext.controllerLock) {
           controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
             if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
               // If the broker leads the topic partition, transition the leader and update isr. Updates zk and
@@ -231,7 +260,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
               // Stop the replica first. The state change below initiates ZK changes which should take some time
               // before which the stop replica request should be completed (in most cases)
               brokerRequestBatch.newBatch()
-              brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, topicAndPartition.partition, deletePartition = false)
+              brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic,
+                topicAndPartition.partition, deletePartition = false)
               brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
 
               // If the broker is a follower, updates the isr in ZK and notifies the current leader
@@ -242,7 +272,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         }
       }
 
-      def replicatedPartitionsBrokerLeads() = controllerContext.controllerLock.synchronized {
+      def replicatedPartitionsBrokerLeads() = inLock(controllerContext.controllerLock) {
         trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
         controllerContext.partitionLeadershipInfo.filter {
           case (topicAndPartition, leaderIsrAndControllerEpoch) =>
@@ -283,8 +313,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
       Utils.registerMBean(this, KafkaController.MBeanName)
       info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
-      initializeAndMaybeTriggerPartitionReassignment()
-      initializeAndMaybeTriggerPreferredReplicaElection()
+      maybeTriggerPartitionReassignment()
+      maybeTriggerPreferredReplicaElection()
       /* send partition leadership info to all live brokers */
       sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
       if (config.autoLeaderRebalanceEnable) {
@@ -293,6 +323,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
           5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS)
       }
+      deleteTopicManager.start()
     }
     else
       info("Controller has been shut down, aborting startup/failover")
@@ -303,7 +334,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    * required to clean up internal controller data structures
    */
   def onControllerResignation() {
-    controllerContext.controllerLock synchronized {
+    inLock(controllerContext.controllerLock) {
+      autoRebalanceScheduler.shutdown()
+      deleteTopicManager.shutdown()
       Utils.unregisterMBean(KafkaController.MBeanName)
       partitionStateMachine.shutdown()
       replicaStateMachine.shutdown()
@@ -318,7 +351,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    * Returns true if this broker is the current controller.
    */
   def isActive(): Boolean = {
-    controllerContext.controllerLock synchronized {
+    inLock(controllerContext.controllerLock) {
       controllerContext.controllerChannelManager != null
     }
   }
@@ -338,7 +371,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    */
   def onBrokerStartup(newBrokers: Seq[Int]) {
     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
-
     val newBrokersSet = newBrokers.toSet
     // send update metadata request for all partitions to the newly restarted brokers. In cases of controlled shutdown
     // leaders will not be elected when a new broker comes up. So at least in the common controlled shutdown case, the
@@ -346,16 +378,25 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     sendUpdateMetadataRequest(newBrokers)
     // the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
     // supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
-    replicaStateMachine.handleStateChanges(controllerContext.replicasOnBrokers(newBrokersSet), OnlineReplica)
+    val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
+    replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
     // when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
     // to see if these brokers can become leaders for some/all of those
     partitionStateMachine.triggerOnlinePartitionStateChange()
     // check if reassignment of some partitions need to be restarted
-    val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter{
-      case (topicAndPartition, reassignmentContext) =>
-        reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
+    val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
+      case (topicAndPartition, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
     }
     partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
+    // check if topic deletion needs to be resumed. If at least one replica that belongs to the topic being deleted exists
+    // on the newly restarted brokers, there is a chance that topic deletion can resume
+    val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
+    if(replicasForTopicsToBeDeleted.size > 0) {
+      info(("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. " +
+        "Signaling restart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
+        deleteTopicManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
+      deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
+    }
   }
 
   /**
@@ -371,20 +412,30 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    */
   def onBrokerFailure(deadBrokers: Seq[Int]) {
     info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
-
     val deadBrokersThatWereShuttingDown =
       deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
     info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
-
     val deadBrokersSet = deadBrokers.toSet
     // trigger OfflinePartition state for all partitions whose current leader is one amongst the dead brokers
     val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
-      deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader)).keySet
+      deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
+        !deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
     partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
     // trigger OnlinePartition state changes for offline or new partitions
     partitionStateMachine.triggerOnlinePartitionStateChange()
+    // filter out the replicas that belong to topics that are being deleted
+    var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
+    val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
     // handle dead replicas
-    replicaStateMachine.handleStateChanges(controllerContext.replicasOnBrokers(deadBrokersSet), OfflineReplica)
+    replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
+    // check if topic deletion state for the dead replicas needs to be updated
+    val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
+    if(replicasForTopicsToBeDeleted.size > 0) {
+      // it is required to mark the respective replicas in TopicDeletionFailed state since the replica cannot be
+      // deleted when the broker is down. This will prevent the replica from being in TopicDeletionStarted state indefinitely
+      // since topic deletion cannot be retried if at least one replica is in TopicDeletionStarted state
+      deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
+    }
   }
 
   /**
@@ -401,7 +452,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   /**
-   * This callback is invoked by the partition state machine's partition change listener with the list of new partitions.
+   * This callback is invoked by the topic change callback with the list of failed brokers as input.
    * It does the following -
    * 1. Move the newly created partitions to the NewPartition state
    * 2. Move the newly created partitions from NewPartition->OnlinePartition state
@@ -409,9 +460,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
     info("New partition creation callback for %s".format(newPartitions.mkString(",")))
     partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
-    replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), NewReplica)
+    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
     partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
-    replicaStateMachine.handleStateChanges(getAllReplicasForPartition(newPartitions), OnlineReplica)
+    replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
   }
 
   /**
@@ -493,8 +544,10 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         removePartitionFromReassignedPartitions(topicAndPartition)
         info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
         controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
-        //12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
-        sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
+        //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
+        sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
+        // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
+        deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
     }
   }
 
@@ -528,6 +581,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
               // first register ISR change listener
               watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
               controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
+              // halt topic deletion for the partitions being reassigned
+              deleteTopicManager.haltTopicDeletion(Set(topic))
               onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
             } else {
               // some replica in RAR is not alive. Fail partition reassignment
@@ -550,11 +605,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
+      deleteTopicManager.haltTopicDeletion(partitions.map(_.topic))
       partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
     } catch {
       case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
     } finally {
       removePartitionsFromPreferredReplicaElection(partitions)
+      deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
     }
   }
 
@@ -564,7 +621,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    * elector
    */
   def startup() = {
-    controllerContext.controllerLock synchronized {
+    inLock(controllerContext.controllerLock) {
       info("Controller starting up");
       registerSessionExpirationListener()
       isRunning = true
@@ -579,7 +636,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    * shuts down the controller channel manager, if one exists (i.e. if it was the current controller)
    */
   def shutdown() = {
-    controllerContext.controllerLock synchronized {
+    inLock(controllerContext.controllerLock) {
       isRunning = false
       partitionStateMachine.shutdown()
       replicaStateMachine.shutdown()
@@ -633,6 +690,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   }
 
   private def initializeControllerContext() {
+    // update controller cache with delete topic information
     controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
     controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
     controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
@@ -642,42 +700,74 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     updateLeaderAndIsrCache()
     // start the channel manager
     startChannelManager()
+    initializePreferredReplicaElection()
+    initializePartitionReassignment()
+    initializeTopicDeletion()
     info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
     info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds))
     info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
   }
 
-  private def initializeAndMaybeTriggerPartitionReassignment() {
+  private def initializePreferredReplicaElection() {
+    // initialize preferred replica election state
+    val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
+    // check if they are already completed or topic was deleted
+    val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter { partition =>
+      val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition)
+      val topicDeleted = replicasOpt.isEmpty
+      val successful =
+        if(!topicDeleted) controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == replicasOpt.get.head else false
+      successful || topicDeleted
+    }
+    controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
+    controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
+    info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
+    info("Partitions that completed preferred replica election: %s".format(partitionsThatCompletedPreferredReplicaElection.mkString(",")))
+    info("Resuming preferred replica election for partitions: %s".format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+  }
+
+  private def initializePartitionReassignment() {
     // read the partitions being reassigned from zookeeper path /admin/reassign_partitions
     val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
-    // check if they are already completed
-    val reassignedPartitions = partitionsBeingReassigned.filter(partition =>
-      controllerContext.partitionReplicaAssignment(partition._1) == partition._2.newReplicas).map(_._1)
+    // check if they are already completed or topic was deleted
+    val reassignedPartitions = partitionsBeingReassigned.filter { partition =>
+      val replicasOpt = controllerContext.partitionReplicaAssignment.get(partition._1)
+      val topicDeleted = replicasOpt.isEmpty
+      val successful = if(!topicDeleted) replicasOpt.get == partition._2.newReplicas else false
+      topicDeleted || successful
+    }.map(_._1)
     reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p))
     var partitionsToReassign: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
     partitionsToReassign ++= partitionsBeingReassigned
     partitionsToReassign --= reassignedPartitions
-
+    controllerContext.partitionsBeingReassigned ++= partitionsToReassign
     info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString()))
     info("Partitions already reassigned: %s".format(reassignedPartitions.toString()))
     info("Resuming reassignment of partitions: %s".format(partitionsToReassign.toString()))
+  }
 
-    partitionsToReassign.foreach { topicPartitionToReassign =>
+  private def initializeTopicDeletion() {
+    val topicsQueuedForDeletion = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.DeleteTopicsPath).toSet
+    val replicasOnDeadBrokers = controllerContext.partitionReplicaAssignment.filter(r =>
+      r._2.foldLeft(false)((res,r) => res || !controllerContext.liveBrokerIds.contains(r)))
+    val topicsWithReplicasOnDeadBrokers = replicasOnDeadBrokers.map(_._1.topic).toSet
+    val topicsForWhichPartitionReassignmentIsInProgress = controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic)
+    val topicsForWhichPreferredReplicaElectionIsInProgress = controllerContext.partitionsBeingReassigned.keySet.map(_.topic)
+    val haltedTopicsForDeletion = topicsWithReplicasOnDeadBrokers | topicsForWhichPartitionReassignmentIsInProgress |
+                                  topicsForWhichPreferredReplicaElectionIsInProgress
+    info("List of topics to be deleted: %s".format(topicsQueuedForDeletion.mkString(",")))
+    info("List of topics halted for deletion: %s".format(haltedTopicsForDeletion.mkString(",")))
+    // initialize the topic deletion manager
+    deleteTopicManager = new TopicDeletionManager(this, topicsQueuedForDeletion, haltedTopicsForDeletion)
+  }
+
+  private def maybeTriggerPartitionReassignment() {
+    controllerContext.partitionsBeingReassigned.foreach { topicPartitionToReassign =>
       initiateReassignReplicasForTopicPartition(topicPartitionToReassign._1, topicPartitionToReassign._2)
     }
   }
 
-  private def initializeAndMaybeTriggerPreferredReplicaElection() {
-    // read the partitions undergoing preferred replica election from zookeeper path
-    val partitionsUndergoingPreferredReplicaElection = ZkUtils.getPartitionsUndergoingPreferredReplicaElection(zkClient)
-    // check if they are already completed
-    val partitionsThatCompletedPreferredReplicaElection = partitionsUndergoingPreferredReplicaElection.filter(partition =>
-      controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader == controllerContext.partitionReplicaAssignment(partition).head)
-    controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitionsUndergoingPreferredReplicaElection
-    controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsThatCompletedPreferredReplicaElection
-    info("Partitions undergoing preferred replica election: %s".format(partitionsUndergoingPreferredReplicaElection.mkString(",")))
-    info("Partitions that completed preferred replica election: %s".format(partitionsThatCompletedPreferredReplicaElection.mkString(",")))
-    info("Resuming preferred replica election for partitions: %s".format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+  private def maybeTriggerPreferredReplicaElection() {
     onPreferredReplicaElection(controllerContext.partitionsUndergoingPreferredReplicaElection.toSet)
   }
 
@@ -736,13 +826,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     val topic = topicAndPartition.topic
     val partition = topicAndPartition.partition
     // first move the replica to offline state (the controller removes it from the ISR)
-    oldReplicas.foreach { replica =>
-      replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topic, partition, replica)), OfflineReplica)
-    }
+    val replicasToBeDeleted = oldReplicas.map(r => PartitionAndReplica(topic, partition, r))
+    replicaStateMachine.handleStateChanges(replicasToBeDeleted, OfflineReplica)
     // send stop replica command to the old replicas
-    oldReplicas.foreach { replica =>
-      replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topic, partition, replica)), NonExistentReplica)
-    }
+    replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionStarted)
+    // TODO: Eventually partition reassignment could use a callback that does retries if deletion failed
+    replicaStateMachine.handleStateChanges(replicasToBeDeleted, ReplicaDeletionSuccessful)
+    replicaStateMachine.handleStateChanges(replicasToBeDeleted, NonExistentReplica)
   }
 
   private def updateAssignedReplicasForPartition(topicAndPartition: TopicAndPartition,
@@ -838,22 +928,14 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     controllerContext.partitionsUndergoingPreferredReplicaElection --= partitionsToBeRemoved
   }
 
-  private def getAllReplicasForPartition(partitions: Set[TopicAndPartition]): Set[PartitionAndReplica] = {
-    partitions.map { p =>
-      val replicas = controllerContext.partitionReplicaAssignment(p)
-      replicas.map(r => new PartitionAndReplica(p.topic, p.partition, r))
-    }.flatten
-  }
-
   /**
    * Send the leader information for selected partitions to selected brokers so that they can correctly respond to
    * metadata requests
    * @param brokers The brokers that the update metadata request should be sent to
-   * @param partitions The partitions for which the metadata is to be sent
    */
-  private def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) {
+  def sendUpdateMetadataRequest(brokers: Seq[Int]) {
     brokerRequestBatch.newBatch()
-    brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions)
+    brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers)
     brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement)
   }
 
@@ -979,7 +1061,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     @throws(classOf[Exception])
     def handleNewSession() {
       info("ZK expired; shut down all controller components and try to re-elect")
-      controllerContext.controllerLock synchronized {
+      inLock(controllerContext.controllerLock) {
         onControllerResignation()
         controllerElector.elect
       }
@@ -991,10 +1073,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
       trace("checking need to trigger partition rebalance")
       // get all the active brokers
       var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
-      controllerContext.controllerLock synchronized {
-        preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.groupBy {
-          case(topicAndPartition, assignedReplicas) => assignedReplicas.head
-        }
+      inLock(controllerContext.controllerLock) {
+        preferredReplicasForTopicsByBrokers =
+          controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
+            case(topicAndPartition, assignedReplicas) => assignedReplicas.head
+          }
       }
       debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
       // for each broker, check if a preferred replica election needs to be triggered
@@ -1002,7 +1085,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
         case(leaderBroker, topicAndPartitionsForBroker) => {
           var imbalanceRatio: Double = 0
           var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
-          controllerContext.controllerLock synchronized {
+          inLock(controllerContext.controllerLock) {
             topicsNotInPreferredReplica =
               topicAndPartitionsForBroker.filter {
                 case(topicPartition, replicas) => {
@@ -1018,7 +1101,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
           // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
           // that need to be on this broker
           if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
-            controllerContext.controllerLock synchronized {
+            inLock(controllerContext.controllerLock) {
               // do this check only if the broker is live and there are no partitions being reassigned currently
               // and preferred replica election is not in progress
               if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
@@ -1070,11 +1153,19 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
     debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
       .format(dataPath, data))
     val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
-    val newPartitions = partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
-    newPartitions.foreach { partitionToBeReassigned =>
-      controllerContext.controllerLock synchronized {
-        val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
-        controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
+    val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
+      partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
+    }
+    partitionsToBeReassigned.foreach { partitionToBeReassigned =>
+      inLock(controllerContext.controllerLock) {
+        if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
+          error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
+            .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
+          controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
+        } else {
+          val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
+          controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
+        }
       }
     }
   }
@@ -1102,11 +1193,11 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
    */
   @throws(classOf[Exception])
   def handleDataChange(dataPath: String, data: Object) {
-    try {
-      controllerContext.controllerLock synchronized {
-        debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
+    inLock(controllerContext.controllerLock) {
+      debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
+      val topicAndPartition = TopicAndPartition(topic, partition)
+      try {
         // check if this partition is still being reassigned or not
-        val topicAndPartition = TopicAndPartition(topic, partition)
         controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
           case Some(reassignedPartitionContext) =>
             // need to re-read leader and isr from zookeeper since the zkclient callback doesn't return the Stat object
@@ -1131,9 +1222,9 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
             }
           case None =>
         }
+      } catch {
+        case e: Throwable => error("Error while handling partition reassignment", e)
       }
-    }catch {
-      case e: Throwable => error("Error while handling partition reassignment", e)
     }
   }
 
@@ -1163,13 +1254,19 @@ class PreferredReplicaElectionListener(controller: KafkaController) extends IZkD
   def handleDataChange(dataPath: String, data: Object) {
     debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
             .format(dataPath, data.toString))
-    val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
-
-    controllerContext.controllerLock synchronized {
-      info("These partitions are already undergoing preferred replica election: %s"
-             .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
-      val newPartitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
-      controller.onPreferredReplicaElection(newPartitions)
+    inLock(controllerContext.controllerLock) {
+      val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
+      if(controllerContext.partitionsUndergoingPreferredReplicaElection.size > 0)
+        info("These partitions are already undergoing preferred replica election: %s"
+          .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
+      val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
+      val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
+      if(partitionsForTopicsToBeDeleted.size > 0) {
+        error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
+          .format(partitionsForTopicsToBeDeleted))
+      }
+      else
+        controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
     }
   }
 
@@ -1194,7 +1291,7 @@ class ControllerEpochListener(controller: KafkaController) extends IZkDataListen
   @throws(classOf[Exception])
   def handleDataChange(dataPath: String, data: Object) {
     debug("Controller epoch listener fired with new epoch " + data.toString)
-    controllerContext.controllerLock synchronized {
+    inLock(controllerContext.controllerLock) {
       // read the epoch path to get the zk version
       readControllerEpochFromZookeeper()
     }
@@ -1222,7 +1319,11 @@ class ControllerEpochListener(controller: KafkaController) extends IZkDataListen
 case class ReassignedPartitionsContext(var newReplicas: Seq[Int] = Seq.empty,
                                        var isrChangeListener: ReassignedPartitionsIsrChangeListener = null)
 
-case class PartitionAndReplica(topic: String, partition: Int, replica: Int)
+case class PartitionAndReplica(topic: String, partition: Int, replica: Int) {
+  override def toString(): String = {
+    "[Topic=%s,Partition=%d,Replica=%d]".format(topic, partition, replica)
+  }
+}
 
 case class LeaderIsrAndControllerEpoch(val leaderAndIsr: LeaderAndIsr, controllerEpoch: Int) {
   override def toString(): String = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index fd9200f..fa29bbe 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -48,24 +48,24 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
   def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
     controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
       case Some(assignedReplicas) =>
-        val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+        val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
         val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
         val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
         val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
         val newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
           case true =>
             debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s"
-              .format(topicAndPartition, liveAssignedReplicasToThisPartition.mkString(",")))
-            liveAssignedReplicasToThisPartition.isEmpty match {
+              .format(topicAndPartition, liveAssignedReplicas.mkString(",")))
+            liveAssignedReplicas.isEmpty match {
               case true =>
                 throw new NoReplicaOnlineException(("No replica for partition " +
                   "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
                   " Assigned replicas are: [%s]".format(assignedReplicas))
               case false =>
                 ControllerStats.uncleanLeaderElectionRate.mark()
-                val newLeader = liveAssignedReplicasToThisPartition.head
+                val newLeader = liveAssignedReplicas.head
                 warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss."
-                     .format(topicAndPartition, newLeader, liveAssignedReplicasToThisPartition.mkString(",")))
+                     .format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
                 new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
             }
           case false =>
@@ -75,7 +75,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
             new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
         }
         info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
-        (newLeaderAndIsr, liveAssignedReplicasToThisPartition)
+        (newLeaderAndIsr, liveAssignedReplicas)
       case None =>
         throw new NoReplicaOnlineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it")
     }
@@ -106,10 +106,10 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
       case None =>
         reassignedInSyncReplicas.size match {
           case 0 =>
-            throw new StateChangeFailedException("List of reassigned replicas for partition " +
+            throw new NoReplicaOnlineException("List of reassigned replicas for partition " +
               " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
           case _ =>
-            throw new StateChangeFailedException("None of the reassigned replicas for partition " +
+            throw new NoReplicaOnlineException("None of the reassigned replicas for partition " +
               "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/167acb83/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index ac4262a..487d4c8 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -26,6 +26,8 @@ import kafka.utils.{Logging, ZkUtils}
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.apache.log4j.Logger
+import kafka.controller.Callbacks.CallbackBuilder
+import kafka.utils.Utils._
 
 /**
  * This class represents the state machine for partitions. It defines the states that a partition can be in, and
@@ -44,8 +46,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   private val controllerId = controller.config.brokerId
   private val zkClient = controllerContext.zkClient
   var partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
-  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller.controllerContext, controller.sendRequest,
-    controllerId, controller.clientId)
+  val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
   private val hasStarted = new AtomicBoolean(false)
   private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
   this.logIdent = "[Partition state machine on Controller " + controllerId + "]: "
@@ -68,6 +69,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   // register topic and partition change listeners
   def registerListeners() {
     registerTopicChangeListener()
+    registerDeleteTopicListener()
   }
 
   /**
@@ -85,10 +87,13 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   def triggerOnlinePartitionStateChange() {
     try {
       brokerRequestBatch.newBatch()
-      // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state
-      for((topicAndPartition, partitionState) <- partitionState) {
+      // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
+      // that belong to topics to be deleted
+      for((topicAndPartition, partitionState) <- partitionState
+          if(!controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic))) {
         if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
-          handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector)
+          handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
+                            (new CallbackBuilder).build)
       }
       brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
     } catch {
@@ -97,18 +102,23 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     }
   }
 
+  def partitionsInState(state: PartitionState): Set[TopicAndPartition] = {
+    partitionState.filter(p => p._2 == state).keySet
+  }
+
   /**
    * This API is invoked by the partition change zookeeper listener
    * @param partitions   The list of partitions that need to be transitioned to the target state
    * @param targetState  The state that the partitions should be moved to
    */
   def handleStateChanges(partitions: Set[TopicAndPartition], targetState: PartitionState,
-                         leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector) {
+                         leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector,
+                         callbacks: Callbacks = (new CallbackBuilder).build) {
     info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
     try {
       brokerRequestBatch.newBatch()
       partitions.foreach { topicAndPartition =>
-        handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector)
+        handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
       }
       brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement)
     }catch {
@@ -131,7 +141,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * --select new leader and isr for this partition and a set of replicas to receive the LeaderAndIsr request, and write leader and isr to ZK
    * --for this partition, send LeaderAndIsr request to every receiving replica and UpdateMetadata request to every live broker
    *
-   * NewPartition,OnlinePartition -> OfflinePartition
+   * NewPartition,OnlinePartition,OfflinePartition -> OfflinePartition
    * --nothing other than marking partition state as Offline
    *
    * OfflinePartition -> NonExistentPartition
@@ -141,7 +151,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
    * @param targetState The end state that the partition should be moved to
    */
   private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,
-                                leaderSelector: PartitionLeaderSelector) {
+                                leaderSelector: PartitionLeaderSelector,
+                                callbacks: Callbacks) {
     val topicAndPartition = TopicAndPartition(topic, partition)
     if (!hasStarted.get)
       throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
@@ -178,7 +189,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
            // post: partition has a leader
         case OfflinePartition =>
           // pre: partition should be in New or Online state
-          assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
+          assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
           // should be called when the leader for a partition is no longer alive
           stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from Online to Offline"
                                     .format(controllerId, controller.epoch, topicAndPartition))
@@ -354,6 +365,10 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
     zkClient.subscribeDataChanges(ZkUtils.getTopicPath(topic), new AddPartitionsListener(topic))
   }
 
+  private def registerDeleteTopicListener() = {
+    zkClient.subscribeChildChanges(ZkUtils.DeleteTopicsPath, new DeleteTopicsListener())
+  }
+
   private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
     val topicAndPartition = TopicAndPartition(topic, partition)
     ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
@@ -373,7 +388,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, children : java.util.List[String]) {
-      controllerContext.controllerLock synchronized {
+      inLock(controllerContext.controllerLock) {
         if (hasStarted.get) {
           try {
             val currentChildren = {
@@ -383,7 +398,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
             }
             val newTopics = currentChildren -- controllerContext.allTopics
             val deletedTopics = controllerContext.allTopics -- currentChildren
-            //        val deletedPartitionReplicaAssignment = replicaAssignment.filter(p => deletedTopics.contains(p._1._1))
             controllerContext.allTopics = currentChildren
 
             val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
@@ -397,12 +411,62 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
           } catch {
             case e: Throwable => error("Error while handling new topic", e )
           }
-          // TODO: kafka-330  Handle deleted topics
         }
       }
     }
   }
 
+  /**
+   * Delete topics includes the following operations -
+   * 1. Add the topic to be deleted to the delete topics cache, only if the topic exists
+   * 2. If there are topics to be deleted, it signals the delete topic thread
+   */
+  class DeleteTopicsListener() extends IZkChildListener with Logging {
+    this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
+    val zkClient = controllerContext.zkClient
+
+    /**
+     * Invoked when a topic is being deleted
+     * @throws Exception On any error.
+     */
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, children : java.util.List[String]) {
+      inLock(controllerContext.controllerLock) {
+        var topicsToBeDeleted = {
+          import JavaConversions._
+          (children: Buffer[String]).toSet
+        }
+        debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
+        val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
+        if(nonExistentTopics.size > 0)
+          warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
+        topicsToBeDeleted --= nonExistentTopics
+        if(topicsToBeDeleted.size > 0) {
+          info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
+          // add topic to deletion list
+          controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
+          // halt if other state changes are in progress
+          topicsToBeDeleted.foreach { topic =>
+            val preferredReplicaElectionInProgress =
+              controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
+            val partitionReassignmentInProgress =
+              controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
+            if(preferredReplicaElectionInProgress | partitionReassignmentInProgress)
+              controller.deleteTopicManager.haltTopicDeletion(Set(topic))
+          }
+        }
+      }
+    }
+
+    /**
+     *
+     * @throws Exception
+   *             On any error.
+     */
+    @throws(classOf[Exception])
+    def handleDataDeleted(dataPath: String) {
+    }
+  }
 
   class AddPartitionsListener(topic: String) extends IZkDataListener with Logging {
 
@@ -410,15 +474,21 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
 
     @throws(classOf[Exception])
     def handleDataChange(dataPath : String, data: Object) {
-      controllerContext.controllerLock synchronized {
+      inLock(controllerContext.controllerLock) {
         try {
           info("Add Partition triggered " + data.toString + " for path " + dataPath)
           val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
-          val partitionsRemainingToBeAdded = partitionReplicaAssignment.filter(p =>
+          val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
             !controllerContext.partitionReplicaAssignment.contains(p._1))
-          info("New partitions to be added [%s]".format(partitionsRemainingToBeAdded))
-          if (partitionsRemainingToBeAdded.size > 0)
-            controller.onNewPartitionCreation(partitionsRemainingToBeAdded.keySet.toSet)
+          if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
+            error("Skipping adding partitions %s for topic %s since it is currently being deleted"
+                  .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
+          else {
+            if (partitionsToBeAdded.size > 0) {
+              info("New partitions to be added %s".format(partitionsToBeAdded))
+              controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
+            }
+          }
         } catch {
           case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
         }