You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/05/06 19:38:43 UTC

git commit: kafka-1397; delete topic is not working; patched by Timothy Chen; reviewed by Neha Narkhede and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 3f7a9061b -> 44c39c4ea


kafka-1397; delete topic is not working; patched by Timothy Chen; reviewed by Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/44c39c4e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/44c39c4e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/44c39c4e

Branch: refs/heads/trunk
Commit: 44c39c4ea48f365445b8c08ce57d0f16b15f0d0b
Parents: 3f7a906
Author: Timothy Chen <tn...@gmail.com>
Authored: Tue May 6 10:36:45 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue May 6 10:37:49 2014 -0700

----------------------------------------------------------------------
 .../src/main/scala/kafka/admin/AdminUtils.scala |  18 +-
 .../controller/ControllerChannelManager.scala   |  14 +-
 .../kafka/controller/KafkaController.scala      |   8 +-
 .../kafka/controller/ReplicaStateMachine.scala  |  10 +-
 .../kafka/controller/TopicDeletionManager.scala |  60 +-
 core/src/main/scala/kafka/log/LogManager.scala  |   1 -
 .../scala/kafka/server/ReplicaManager.scala     |  11 +-
 .../scala/kafka/utils/ShutdownableThread.scala  |  27 +-
 .../unit/kafka/admin/DeleteTopicTest.scala      | 612 ++++++++-----------
 .../kafka/server/DynamicConfigChangeTest.scala  |  17 +-
 .../unit/kafka/server/ServerShutdownTest.scala  |   2 -
 .../test/scala/unit/kafka/utils/TestUtils.scala |   2 +
 12 files changed, 376 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 36ddeb4..b5d8714 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -87,7 +87,17 @@ object AdminUtils extends Logging {
     ret.toMap
   }
 
-  def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "") {
+
+ /**
+  * Add partitions to existing topic with optional replica assignment
+  *
+  * @param zkClient Zookeeper client
+  * @param topic Topic for adding partitions to
+  * @param numPartitions Number of partitions to be set
+  * @param replicaAssignmentStr Manual replica assignment
+  * @param checkBrokerAvailable Ignore checking if assigned replica broker is available. Only used for testing
+  */
+  def addPartitions(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicaAssignmentStr: String = "", checkBrokerAvailable: Boolean = true) {
     val existingPartitionsReplicaList = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
     if (existingPartitionsReplicaList.size == 0)
       throw new AdminOperationException("The topic %s does not exist".format(topic))
@@ -102,7 +112,7 @@ object AdminUtils extends Logging {
     val newPartitionReplicaList = if (replicaAssignmentStr == null || replicaAssignmentStr == "")
       AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, existingReplicaList.size, existingReplicaList.head, existingPartitionsReplicaList.size)
     else
-      getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size)
+      getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, existingPartitionsReplicaList.size, checkBrokerAvailable)
 
     // check if manual assignment has the right replication factor
     val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => (p.size != existingReplicaList.size))
@@ -117,7 +127,7 @@ object AdminUtils extends Logging {
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, partitionReplicaList, update = true)
   }
 
-  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int): Map[Int, List[Int]] = {
+  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[Int], startPartitionId: Int, checkBrokerAvailable: Boolean = true): Map[Int, List[Int]] = {
     var partitionList = replicaAssignmentList.split(",")
     val ret = new mutable.HashMap[Int, List[Int]]()
     var partitionId = startPartitionId
@@ -128,7 +138,7 @@ object AdminUtils extends Logging {
         throw new AdminOperationException("replication factor must be larger than 0")
       if (brokerList.size != brokerList.toSet.size)
         throw new AdminOperationException("duplicate brokers in replica assignment: " + brokerList)
-      if (!brokerList.toSet.subsetOf(availableBrokerList))
+      if (checkBrokerAvailable && !brokerList.toSet.subsetOf(availableBrokerList))
         throw new AdminOperationException("some specified brokers not available. specified brokers: " + brokerList.toString +
           "available broker:" + availableBrokerList.toString)
       ret.put(partitionId, brokerList.toList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 919aeb2..8763968 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -206,13 +206,17 @@ class ControllerBrokerRequestBatch(controller: KafkaController) extends  Logging
   def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
                                        leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch,
                                        replicas: Seq[Int], callback: (RequestOrResponse) => Unit = null) {
-    brokerIds.filter(b => b >= 0).foreach { brokerId =>
-      leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
-      leaderAndIsrRequestMap(brokerId).put((topic, partition),
-        PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
+    val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partition)
+
+    brokerIds.filter(b => b >= 0).foreach {
+      brokerId =>
+        leaderAndIsrRequestMap.getOrElseUpdate(brokerId, new mutable.HashMap[(String, Int), PartitionStateInfo])
+        leaderAndIsrRequestMap(brokerId).put((topic, partition),
+          PartitionStateInfo(leaderIsrAndControllerEpoch, replicas.toSet))
     }
+
     addUpdateMetadataRequestForBrokers(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
-                                       Set(TopicAndPartition(topic, partition)))
+                                       Set(topicAndPartition))
   }
 
   def addStopReplicaRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int, deletePartition: Boolean,

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 933de9d..401bf1e 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -337,11 +337,13 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
    * required to clean up internal controller data structures
    */
   def onControllerResignation() {
+    if (deleteTopicManager != null)
+      deleteTopicManager.shutdown()
+
     inLock(controllerContext.controllerLock) {
       if (config.autoLeaderRebalanceEnable)
         autoRebalanceScheduler.shutdown()
-      if (deleteTopicManager != null)
-        deleteTopicManager.shutdown()
+
       Utils.unregisterMBean(KafkaController.MBeanName)
       partitionStateMachine.shutdown()
       replicaStateMachine.shutdown()
@@ -644,8 +646,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
   def shutdown() = {
     inLock(controllerContext.controllerLock) {
       isRunning = false
-      onControllerResignation()
     }
+    onControllerResignation()
   }
 
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 0e47dac..2f0f29d 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -233,8 +233,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
                   case Some(updatedLeaderIsrAndControllerEpoch) =>
                     // send the shrunk ISR state change request to all the remaining alive replicas of the partition.
                     val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
-                    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
-                      topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
+                    if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
+                      brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
+                        topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
+                    }
                     replicaState.put(partitionAndReplica, OfflineReplica)
                     stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                       .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
@@ -275,6 +277,10 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
     replicaState.filter(r => r._1.topic.equals(topic) && r._2 == state).keySet
   }
 
+  def isAnyReplicaInState(topic: String, state: ReplicaState): Boolean = {
+    replicaState.exists(r => r._1.topic.equals(topic) && r._2 == state)
+  }
+
   def replicasInDeletionStates(topic: String): Set[PartitionAndReplica] = {
     val deletionStates = Set(ReplicaDeletionStarted, ReplicaDeletionSuccessful, ReplicaDeletionIneligible)
     replicaState.filter(r => r._1.topic.equals(topic) && deletionStates.contains(r._2)).keySet

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index e4bc243..219c413 100644
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -69,6 +69,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 class TopicDeletionManager(controller: KafkaController,
                            initialTopicsToBeDeleted: Set[String] = Set.empty,
                            initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {
+  this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
   val controllerContext = controller.controllerContext
   val partitionStateMachine = controller.partitionStateMachine
   val replicaStateMachine = controller.replicaStateMachine
@@ -81,14 +82,12 @@ class TopicDeletionManager(controller: KafkaController,
   val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
   var deleteTopicsThread: DeleteTopicsThread = null
   val isDeleteTopicEnabled = controller.config.deleteTopicEnable
-  @volatile var isShuttingDown = false
 
   /**
    * Invoked at the end of new controller initiation
    */
   def start() {
-    if(isDeleteTopicEnabled) {
-      isShuttingDown = false
+    if (isDeleteTopicEnabled) {
       deleteTopicsThread = new DeleteTopicsThread()
       deleteTopicStateChanged.set(true)
       deleteTopicsThread.start()
@@ -96,17 +95,18 @@ class TopicDeletionManager(controller: KafkaController,
   }
 
   /**
-   * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared
+   * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared.
    */
   def shutdown() {
-    if(isDeleteTopicEnabled) {
-      isShuttingDown = true
+    // Only allow one shutdown to go through
+    if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) {
+      // Resume the topic deletion so it doesn't block on the condition
       resumeTopicDeletionThread()
-      deleteTopicsThread.shutdown()
+      // Await delete topic thread to exit
+      deleteTopicsThread.awaitShutdown()
       topicsToBeDeleted.clear()
       partitionsToBeDeleted.clear()
       topicsIneligibleForDeletion.clear()
-      isShuttingDown = false
     }
   }
 
@@ -194,6 +194,13 @@ class TopicDeletionManager(controller: KafkaController,
       false
   }
 
+  def isPartitionToBeDeleted(topicAndPartition: TopicAndPartition) = {
+    if(isDeleteTopicEnabled) {
+      partitionsToBeDeleted.contains(topicAndPartition)
+    } else
+      false
+  }
+
   def isTopicQueuedUpForDeletion(topic: String): Boolean = {
     if(isDeleteTopicEnabled) {
       topicsToBeDeleted.contains(topic)
@@ -207,8 +214,8 @@ class TopicDeletionManager(controller: KafkaController,
    */
   private def awaitTopicDeletionNotification() {
     inLock(deleteLock) {
-      while(!isShuttingDown && !deleteTopicStateChanged.compareAndSet(true, false)) {
-        info("Waiting for signal to start or continue topic deletion")
+      while(!deleteTopicsThread.isRunning.get() && !deleteTopicStateChanged.compareAndSet(true, false)) {
+        debug("Waiting for signal to start or continue topic deletion")
         deleteTopicsCond.await()
       }
     }
@@ -257,6 +264,8 @@ class TopicDeletionManager(controller: KafkaController,
   private def markTopicForDeletionRetry(topic: String) {
     // reset replica states from ReplicaDeletionIneligible to OfflineReplica
     val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
+    info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
+      .format(topic, failedReplicas.mkString(",")))
     controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
   }
 
@@ -281,7 +290,10 @@ class TopicDeletionManager(controller: KafkaController,
 
   /**
    * This callback is invoked by the DeleteTopics thread with the list of topics to be deleted
-   * It invokes the delete partition callback for all partitions of a topic
+   * It invokes the delete partition callback for all partitions of a topic.
+   * The updateMetadataRequest is also going to set the leader for the topics being deleted to
+   * {@link LeaderAndIsr#LeaderDuringDelete}. This lets each broker know that this topic is being deleted and can be
+   * removed from their caches.
    */
   private def onTopicDeletion(topics: Set[String]) {
     info("Topic deletion callback for %s".format(topics.mkString(",")))
@@ -305,10 +317,9 @@ class TopicDeletionManager(controller: KafkaController,
    * As part of starting deletion, all replicas are moved to the ReplicaDeletionStarted state where the controller sends
    * the replicas a StopReplicaRequest (delete=true)
    * This callback does the following things -
-   * 1. Send metadata request to all brokers excluding the topics to be deleted
-   * 2. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
+   * 1. Move all dead replicas directly to ReplicaDeletionIneligible state. Also mark the respective topics ineligible
    *    for deletion if some replicas are dead since it won't complete successfully anyway
-   * 3. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
+   * 2. Move all alive replicas to ReplicaDeletionStarted state so they can be deleted successfully
    *@param replicasForTopicsToBeDeleted
    */
   private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
@@ -324,8 +335,10 @@ class TopicDeletionManager(controller: KafkaController,
       debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
       controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
         new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
-      if(deadReplicasForTopic.size > 0)
+      if(deadReplicasForTopic.size > 0) {
+        debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
         markTopicIneligibleForDeletion(Set(topic))
+      }
     }
   }
 
@@ -365,12 +378,12 @@ class TopicDeletionManager(controller: KafkaController,
     }
   }
 
-  class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread", isInterruptible = false) {
+  class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {
     val zkClient = controllerContext.zkClient
     override def doWork() {
       awaitTopicDeletionNotification()
 
-      if(!isRunning.get)
+      if (!isRunning.get)
         return
 
       inLock(controllerContext.controllerLock) {
@@ -395,13 +408,12 @@ class TopicDeletionManager(controller: KafkaController,
                 partitions.mkString(","), topic))
             } else {
               // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
-              // TopicDeletionSuccessful. That means, there is at least one failed replica, which means topic deletion
-              // should be retried
-              val replicasInTopicDeletionFailedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
-              // mark topic for deletion retry
-              markTopicForDeletionRetry(topic)
-              info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
-                .format(topic, replicasInTopicDeletionFailedState.mkString(",")))
+              // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
+              // or there is at least one failed replica (which means topic deletion should be retried).
+              if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
+                // mark topic for deletion retry
+                markTopicForDeletionRetry(topic)
+              }
             }
           }
           // Try delete topic if it is eligible for deletion.

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index ac67f08..ab72cff 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -44,7 +44,6 @@ class LogManager(val logDirs: Array[File],
                  val retentionCheckMs: Long,
                  scheduler: Scheduler,
                  private val time: Time) extends Logging {
-
   val RecoveryPointCheckpointFile = "recovery-point-offset-checkpoint"
   val LockFile = ".lock"
   val InitialTaskDelayMs = 30*1000

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 11c20ce..6a56a77 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -125,7 +125,16 @@ class ReplicaManager(val config: KafkaConfig,
           if (removedPartition != null)
             removedPartition.delete() // this will delete the local log
         }
-      case None => //do nothing if replica no longer exists. This can happen during delete topic retries
+      case None =>
+        // Delete log and corresponding folders in case replica manager doesn't hold them anymore.
+        // This could happen when topic is being deleted while broker is down and recovers.
+        if(deletePartition) {
+          val topicAndPartition = TopicAndPartition(topic, partitionId)
+
+          if(logManager.getLog(topicAndPartition).isDefined) {
+              logManager.deleteLog(topicAndPartition)
+          }
+        }
         stateChangeLogger.trace("Broker %d ignoring stop replica (delete=%s) for partition [%s,%d] as replica doesn't exist on broker"
           .format(localBrokerId, deletePartition, topic, partitionId))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/main/scala/kafka/utils/ShutdownableThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
index cf8adc9..fc226c8 100644
--- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala
+++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala
@@ -27,20 +27,29 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean
   val isRunning: AtomicBoolean = new AtomicBoolean(true)
   private val shutdownLatch = new CountDownLatch(1)
 
+  def shutdown() = {
+    initiateShutdown()
+    awaitShutdown()
+  }
 
-  def shutdown(): Unit = {
-    info("Shutting down")
-    isRunning.set(false)
-    if (isInterruptible)
-      interrupt()
-    shutdownLatch.await()
-    info("Shutdown completed")
+  def initiateShutdown(): Boolean = {
+    if(isRunning.compareAndSet(true, false)) {
+      info("Shutting down")
+      isRunning.set(false)
+      if (isInterruptible)
+        interrupt()
+      true
+    } else
+      false
   }
 
     /**
-   * After calling shutdown(), use this API to wait until the shutdown is complete
+   * After calling initiateShutdown(), use this API to wait until the shutdown is complete
    */
-  def awaitShutdown(): Unit = shutdownLatch.await()
+  def awaitShutdown(): Unit = {
+    shutdownLatch.await()
+    info("Shutdown completed")
+  }
 
   def doWork(): Unit
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 9c29e14..a821d60 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -33,376 +33,286 @@ import kafka.api.PartitionOffsetRequestInfo
 
 class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
 
-  /*  Temporarily disable all tests until delete topic is fixed.
-   *  Add a fake test to let junit tests pass.
-   */
   @Test
-  def testFake() {
+  def testDeleteTopicWithAllAliveReplicas() {
+    val topicAndPartition = TopicAndPartition("test", 0)
+    val topic = topicAndPartition.topic
+    val servers = createTestTopicAndCluster(topic)
+    // start topic deletion
+    AdminUtils.deleteTopic(zkClient, topic)
+    verifyTopicDeletion(topic, servers)
+    servers.foreach(_.shutdown())
   }
 
+  @Test
+  def testResumeDeleteTopicWithRecoveredFollower() {
+    val topicAndPartition = TopicAndPartition("test", 0)
+    val topic = topicAndPartition.topic
+    val servers = createTestTopicAndCluster(topic)
+    // shut down one follower replica
+    val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
+    val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
+    follower.shutdown()
+    // start topic deletion
+    AdminUtils.deleteTopic(zkClient, topic)
+    // check if all replicas but the one that is shut down has deleted the log
+    TestUtils.waitUntilTrue(() =>
+      servers.filter(s => s.config.brokerId != follower.config.brokerId)
+        .foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty), "Replicas 0,1 have not deleted log.")
+    // ensure topic deletion is halted
+    TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+      "Admin path /admin/delete_topic/test path deleted even when a follower replica is down")
+    // restart follower replica
+    follower.startup()
+    verifyTopicDeletion(topic, servers)
+    servers.foreach(_.shutdown())
+  }
 
-  /*
-    @Test
-    def testDeleteTopicWithAllAliveReplicas() {
-      val topicAndPartition = TopicAndPartition("test", 0)
-      val topic = topicAndPartition.topic
-      val servers = createTestTopicAndCluster(topic)
-      // start topic deletion
-      AdminUtils.deleteTopic(zkClient, topic)
-      verifyTopicDeletion(topic, servers)
-      servers.foreach(_.shutdown())
-    }
+  @Test
+  def testResumeDeleteTopicOnControllerFailover() {
+    val topicAndPartition = TopicAndPartition("test", 0)
+    val topic = topicAndPartition.topic
+    val servers = createTestTopicAndCluster(topic)
+    val controllerId = ZkUtils.getController(zkClient)
+    val controller = servers.filter(s => s.config.brokerId == controllerId).head
+    var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get && s.config.brokerId != controllerId).last
+    follower.shutdown()
 
-    @Test
-    def testResumeDeleteTopicWithRecoveredFollower() {
-      val topicAndPartition = TopicAndPartition("test", 0)
-      val topic = topicAndPartition.topic
-      val servers = createTestTopicAndCluster(topic)
-      // shut down one follower replica
-      val leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
-      assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
-      val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
-      follower.shutdown()
-      // start topic deletion
-      AdminUtils.deleteTopic(zkClient, topic)
-      // check if all replicas but the one that is shut down has deleted the log
-      assertTrue("Replicas 0,1 have not deleted log in 1000ms", TestUtils.waitUntilTrue(() =>
-        servers.filter(s => s.config.brokerId != follower.config.brokerId)
-          .foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty), 1000))
-      // ensure topic deletion is halted
-      assertTrue("Admin path /admin/delete_topic/test path deleted in 1000ms even when a follower replica is down",
-        TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 500))
-      // restart follower replica
-      follower.startup()
-      verifyTopicDeletion(topic, servers)
-      servers.foreach(_.shutdown())
-    }
+    // start topic deletion
+    AdminUtils.deleteTopic(zkClient, topic)
+    // shut down the controller to trigger controller failover during delete topic
+    controller.shutdown()
 
-    @Test
-    def testResumeDeleteTopicOnControllerFailover() {
-      val topicAndPartition = TopicAndPartition("test", 0)
-      val topic = topicAndPartition.topic
-      val servers = createTestTopicAndCluster(topic)
-      // start topic deletion
-      AdminUtils.deleteTopic(zkClient, topic)
-      // shut down the controller to trigger controller failover during delete topic
-      val controllerId = ZkUtils.getController(zkClient)
-      val controller = servers.filter(s => s.config.brokerId == controllerId).head
-      controller.shutdown()
-      // ensure topic deletion is halted
-      assertTrue("Admin path /admin/delete_topic/test path deleted in 500ms even when a replica is down",
-        TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 500))
-      // restart follower replica
-      controller.startup()
-      // wait until admin path for delete topic is deleted, signaling completion of topic deletion
-      assertTrue("Admin path /admin/delete_topic/test path not deleted in 4000ms even after a follower replica is restarted",
-        TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 4000))
-      assertTrue("Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted",
-        TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), 100))
-      // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
-      assertTrue("Replica logs not deleted after delete topic is complete",
-        servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty))
-      servers.foreach(_.shutdown())
-    }
+    // ensure topic deletion is halted
+    TestUtils.waitUntilTrue(() => ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+      "Admin path /admin/delete_topic/test path deleted even when a replica is down")
 
-    @Test
-    def testRequestHandlingDuringDeleteTopic() {
-      val topicAndPartition = TopicAndPartition("test", 0)
-      val topic = topicAndPartition.topic
-      val servers = createTestTopicAndCluster(topic)
-      // start topic deletion
-      AdminUtils.deleteTopic(zkClient, topic)
-      // shut down one follower replica
-      var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
-      assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
-      val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
-      follower.shutdown()
-      // test if produce requests are failed with UnknownTopicOrPartitionException during delete topic
-      val props1 = new Properties()
-      props1.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(","))
-      props1.put("serializer.class", "kafka.serializer.StringEncoder")
-      props1.put("request.required.acks", "1")
-      val producerConfig1 = new ProducerConfig(props1)
-      val producer1 = new Producer[String, String](producerConfig1)
-      try{
-        producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
-        fail("Test should fail because the topic is being deleted")
-      } catch {
-        case e: FailedToSendMessageException =>
-        case oe: Throwable => fail("fails with exception", oe)
-      } finally {
-        producer1.close()
-      }
-      // test if fetch requests fail during delete topic
-      servers.filter(s => s.config.brokerId != follower.config.brokerId).foreach { server =>
-        val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64*1024, "")
+    controller.startup()
+    follower.startup()
+
+    verifyTopicDeletion(topic, servers)
+    servers.foreach(_.shutdown())
+  }
+
+  @Test
+  def testRequestHandlingDuringDeleteTopic() {
+    val topicAndPartition = TopicAndPartition("test", 0)
+    val topic = topicAndPartition.topic
+    val servers = createTestTopicAndCluster(topic)
+    // start topic deletion
+    AdminUtils.deleteTopic(zkClient, topic)
+    // shut down one follower replica
+    var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
+    val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
+    follower.shutdown()
+    // test if produce requests are failed with UnknownTopicOrPartitionException during delete topic
+    val props1 = new Properties()
+    props1.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(","))
+    props1.put("serializer.class", "kafka.serializer.StringEncoder")
+    props1.put("request.required.acks", "1")
+    val producerConfig1 = new ProducerConfig(props1)
+    val producer1 = new Producer[String, String](producerConfig1)
+    try {
+      producer1.send(new KeyedMessage[String, String](topic, "test", "test1"))
+      fail("Test should fail because the topic is being deleted")
+    } catch {
+      case e: FailedToSendMessageException =>
+      case oe: Throwable => fail("fails with exception", oe)
+    } finally {
+      producer1.close()
+    }
+    // test if fetch requests fail during delete topic
+    val availableServers: Seq[KafkaServer] = servers.filter(s => s.config.brokerId != follower.config.brokerId).toSeq
+    availableServers.foreach {
+      server =>
+        val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64 * 1024, "")
         val request = new FetchRequestBuilder()
           .clientId("test-client")
           .addFetch(topic, 0, 0, 10000)
           .build()
         val fetched = consumer.fetch(request)
         val fetchResponse = fetched.data(topicAndPartition)
-        assertTrue("Fetch should fail with UnknownTopicOrPartitionCode", fetchResponse.error == ErrorMapping.UnknownTopicOrPartitionCode)
-      }
-      // test if offset requests fail during delete topic
-      servers.filter(s => s.config.brokerId != follower.config.brokerId).foreach { server =>
-        val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64*1024, "")
+        assertEquals("Fetch should fail with UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.error)
+    }
+    // test if offset requests fail during delete topic
+    availableServers.foreach {
+      server =>
+        val consumer = new SimpleConsumer(server.config.hostName, server.config.port, 1000000, 64 * 1024, "")
         val offsetRequest = new OffsetRequest(Map(topicAndPartition -> new PartitionOffsetRequestInfo(OffsetRequest.LatestTime, 1)))
         val offsetResponse = consumer.getOffsetsBefore(offsetRequest)
         val errorCode = offsetResponse.partitionErrorAndOffsets(topicAndPartition).error
-        assertTrue("Offset request should fail with UnknownTopicOrPartitionCode", errorCode == ErrorMapping.UnknownTopicOrPartitionCode)
-      }
-      // restart follower replica
-      follower.startup()
-      verifyTopicDeletion(topic, servers)
-      servers.foreach(_.shutdown())
-    }
-
-    @Test
-    def testDeleteTopicDuringPreferredReplicaElection() {
-      val topic = "test"
-      val topicAndPartition = TopicAndPartition(topic, 0)
-      val servers = createTestTopicAndCluster(topic)
-      var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
-      assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
-      // shut down the controller to move the leader to a non preferred replica before delete topic
-      val preferredReplicaId = 0
-      val preferredReplica = servers.filter(s => s.config.brokerId == preferredReplicaId).head
-      preferredReplica.shutdown()
-      preferredReplica.startup()
-      val newLeaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 3000, leaderIdOpt)
-      assertTrue("New leader should be elected prior to delete topic", newLeaderIdOpt.isDefined)
-      // test preferred replica election
-      val preferredReplicaElection = new PreferredReplicaLeaderElectionCommand(zkClient, Set(topicAndPartition))
-      preferredReplicaElection.moveLeaderToPreferredReplica()
-      // start topic deletion during preferred replica election. This should halt topic deletion but eventually
-      // complete it successfully
-      AdminUtils.deleteTopic(zkClient, topic)
-      val newControllerId = ZkUtils.getController(zkClient)
-      val newController = servers.filter(s => s.config.brokerId == newControllerId).head
-      assertTrue("Preferred replica election should succeed after 1000ms", TestUtils.waitUntilTrue(() =>
-        !newController.kafkaController.controllerContext.partitionsUndergoingPreferredReplicaElection.contains(topicAndPartition), 1000))
-      verifyTopicDeletion(topic, servers)
-      servers.foreach(_.shutdown())
-    }
-
-    @Test
-    def testPartitionReassignmentDuringDeleteTopic() {
-      val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
-      val topic = "test"
-      val topicAndPartition = TopicAndPartition(topic, 0)
-      val brokerConfigs = TestUtils.createBrokerConfigs(4)
-      brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
-      // create brokers
-      val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
-      val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
-      // create the topic
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
-      // wait until replica log is created on every broker
-      assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
-        res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000))
-      var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
-      assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
-      // start topic deletion
-      AdminUtils.deleteTopic(zkClient, topic)
-      // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since
-      // the topic is being deleted
-      // reassign partition 0
-      val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
-      val newReplicas = Seq(1, 2, 3)
-      val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
-      assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
-      // wait until reassignment is completed
-      TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
-          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed;
-      }, 1000)
-      val controllerId = ZkUtils.getController(zkClient)
-      val controller = servers.filter(s => s.config.brokerId == controllerId).head
-      assertFalse("Partition reassignment should fail",
-        controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition))
-      val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
-      assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
-      verifyTopicDeletion(topic, servers)
-      allServers.foreach(_.shutdown())
-    }
-
-    @Test
-    def testDeleteTopicDuringPartitionReassignment() {
-      val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
-      val topic = "test"
-      val topicAndPartition = TopicAndPartition(topic, 0)
-      val brokerConfigs = TestUtils.createBrokerConfigs(4)
-      brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
-      // create brokers
-      val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
-      val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
-      // create the topic
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
-      // wait until replica log is created on every broker
-      assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
-        res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000))
-      var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
-      assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
-      // start partition reassignment at the same time right before delete topic. In this case, reassignment will succeed
-      // reassign partition 0
-      val newReplicas = Seq(1, 2, 3)
-      val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
-      assertTrue("Partition reassignment failed for test, 0", reassignPartitionsCommand.reassignPartitions())
-      // start topic deletion
-      AdminUtils.deleteTopic(zkClient, topic)
-      // wait until reassignment is completed
-      TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
-        ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
-          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentCompleted;
-      }, 1000)
-      val controllerId = ZkUtils.getController(zkClient)
-      val controller = servers.filter(s => s.config.brokerId == controllerId).head
-      assertFalse("Partition reassignment should complete",
-        controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition))
-      val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
-      assertEquals("Partition should be reassigned to 1,2,3", newReplicas, assignedReplicas)
-      verifyTopicDeletion(topic, allServers)
-      allServers.foreach(_.shutdown())
+        assertEquals("Offset request should fail with UnknownTopicOrPartitionCode", ErrorMapping.UnknownTopicOrPartitionCode, errorCode)
     }
+    // restart follower replica
+    follower.startup()
+    verifyTopicDeletion(topic, availableServers)
+    servers.foreach(_.shutdown())
+  }
 
-    @Test
-    def testDeleteTopicDuringAddPartition() {
-      val topic = "test"
-      val servers = createTestTopicAndCluster(topic)
-      val newPartition = TopicAndPartition(topic, 1)
-      // add partitions to topic
-      AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2")
-      // start topic deletion
-      AdminUtils.deleteTopic(zkClient, topic)
-      // test if topic deletion is resumed
-     verifyTopicDeletion(topic, servers)
-      // verify that new partition doesn't exist on any broker either
-      assertTrue("Replica logs not for new partition [test,1] not deleted after delete topic is complete", TestUtils.waitUntilTrue(() =>
-        servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty), 1000))
-      servers.foreach(_.shutdown())
-    }
+  @Test
+  def testPartitionReassignmentDuringDeleteTopic() {
+    val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
+    val topic = "test"
+    val topicAndPartition = TopicAndPartition(topic, 0)
+    val brokerConfigs = TestUtils.createBrokerConfigs(4)
+    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
+    // create brokers
+    val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
+    // create the topic
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+    // wait until replica log is created on every broker
+    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
+      res && server.getLogManager().getLog(topicAndPartition).isDefined),
+      "Replicas for topic test not created.")
+    var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
+    val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
+    follower.shutdown()
+    // start topic deletion
+    AdminUtils.deleteTopic(zkClient, topic)
+    // start partition reassignment at the same time right after delete topic. In this case, reassignment will fail since
+    // the topic is being deleted
+    // reassign partition 0
+    val oldAssignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
+    val newReplicas = Seq(1, 2, 3)
+    val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, Map(topicAndPartition -> newReplicas))
+    assertTrue("Partition reassignment should fail for [test,0]", reassignPartitionsCommand.reassignPartitions())
+    // wait until reassignment is completed
+    TestUtils.waitUntilTrue(() => {
+      val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient).mapValues(_.newReplicas);
+      ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, topicAndPartition, newReplicas,
+        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == ReassignmentFailed;
+    }, "Partition reassignment shouldn't complete.")
+    val controllerId = ZkUtils.getController(zkClient)
+    val controller = servers.filter(s => s.config.brokerId == controllerId).head
+    assertFalse("Partition reassignment should fail",
+      controller.kafkaController.controllerContext.partitionsBeingReassigned.contains(topicAndPartition))
+    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, 0)
+    assertEquals("Partition should not be reassigned to 0, 1, 2", oldAssignedReplicas, assignedReplicas)
+    follower.startup()
+    verifyTopicDeletion(topic, servers)
+    allServers.foreach(_.shutdown())
+  }
 
-    @Test
-    def testAddPartitionDuringDeleteTopic() {
-      val topic = "test"
-      val topicAndPartition = TopicAndPartition(topic, 0)
-      val servers = createTestTopicAndCluster(topic)
-      // start topic deletion
-      AdminUtils.deleteTopic(zkClient, topic)
-      // add partitions to topic
-      val newPartition = TopicAndPartition(topic, 1)
-      AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2")
-      verifyTopicDeletion(topic, servers)
-      // verify that new partition doesn't exist on any broker either
-      assertTrue("Replica logs not deleted after delete topic is complete",
-        servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty))
-      servers.foreach(_.shutdown())
-    }
+  @Test
+  def testDeleteTopicDuringAddPartition() {
+    val topic = "test"
+    val servers = createTestTopicAndCluster(topic)
+    var leaderIdOpt = ZkUtils.getLeaderForPartition(zkClient, topic, 0)
+    assertTrue("Leader should exist for partition [test,0]", leaderIdOpt.isDefined)
+    val follower = servers.filter(s => s.config.brokerId != leaderIdOpt.get).last
+    val newPartition = TopicAndPartition(topic, 1)
+    follower.shutdown()
+    // add partitions to topic
+    AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2", false)
+    // start topic deletion
+    AdminUtils.deleteTopic(zkClient, topic)
+    follower.startup()
+    // test if topic deletion is resumed
+    verifyTopicDeletion(topic, servers)
+    // verify that new partition doesn't exist on any broker either
+    TestUtils.waitUntilTrue(() =>
+      servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty),
+      "Replica logs not for new partition [test,1] not deleted after delete topic is complete.")
+    servers.foreach(_.shutdown())
+  }
 
-    @Test
-    def testRecreateTopicAfterDeletion() {
-      val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
-      val topic = "test"
-      val topicAndPartition = TopicAndPartition(topic, 0)
-      val servers = createTestTopicAndCluster(topic)
-      // start topic deletion
-      AdminUtils.deleteTopic(zkClient, topic)
-      verifyTopicDeletion(topic, servers)
-      // re-create topic on same replicas
-      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
-      // wait until leader is elected
-      val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
-      assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
-      // check if all replica logs are created
-      assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
-        res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000))
-      servers.foreach(_.shutdown())
-    }
+  @Test
+  def testAddPartitionDuringDeleteTopic() {
+    val topic = "test"
+    val topicAndPartition = TopicAndPartition(topic, 0)
+    val servers = createTestTopicAndCluster(topic)
+    // start topic deletion
+    AdminUtils.deleteTopic(zkClient, topic)
+    // add partitions to topic
+    val newPartition = TopicAndPartition(topic, 1)
+    AdminUtils.addPartitions(zkClient, topic, 2, "0:1:2,0:1:2")
+    verifyTopicDeletion(topic, servers)
+    // verify that new partition doesn't exist on any broker either
+    assertTrue("Replica logs not deleted after delete topic is complete",
+      servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(newPartition).isEmpty))
+    servers.foreach(_.shutdown())
+  }
 
-    @Test
-    def testTopicConfigChangesDuringDeleteTopic() {
-      val topic = "test"
-      val servers = createTestTopicAndCluster(topic)
-      val topicConfigs = new Properties()
-      topicConfigs.put("segment.ms", "1000000")
-      // start topic deletion
-      AdminUtils.deleteTopic(zkClient, topic)
-      verifyTopicDeletion(topic, servers)
-      // make topic config changes
-      try {
-        AdminUtils.changeTopicConfig(zkClient, topic, topicConfigs)
-        fail("Should fail with AdminOperationException for topic doesn't exist")
-      } catch {
-        case e: AdminOperationException => // expected
-      }
-      servers.foreach(_.shutdown())
-    }
+  @Test
+  def testRecreateTopicAfterDeletion() {
+    val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
+    val topic = "test"
+    val topicAndPartition = TopicAndPartition(topic, 0)
+    val servers = createTestTopicAndCluster(topic)
+    // start topic deletion
+    AdminUtils.deleteTopic(zkClient, topic)
+    verifyTopicDeletion(topic, servers)
+    // re-create topic on same replicas
+    AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
+    // wait until leader is elected
+    val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
+    // check if all replica logs are created
+    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isDefined),
+      "Replicas for topic test not created.")
+    servers.foreach(_.shutdown())
+  }
 
-    @Test
-    def testAutoCreateAfterDeleteTopic() {
-      val topicAndPartition = TopicAndPartition("test", 0)
-      val topic = topicAndPartition.topic
-      val servers = createTestTopicAndCluster(topic)
-      // start topic deletion
-      AdminUtils.deleteTopic(zkClient, topic)
-      verifyTopicDeletion(topic, servers)
-      // test if first produce request after topic deletion auto creates the topic
-      val props = new Properties()
-      props.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(","))
-      props.put("serializer.class", "kafka.serializer.StringEncoder")
-      props.put("producer.type", "sync")
-      props.put("request.required.acks", "1")
-      props.put("message.send.max.retries", "1")
-      val producerConfig = new ProducerConfig(props)
-      val producer = new Producer[String, String](producerConfig)
-      try{
-        producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
-      } catch {
-        case e: FailedToSendMessageException => fail("Topic should have been auto created")
-        case oe: Throwable => fail("fails with exception", oe)
-      }
-      // test the topic path exists
-      assertTrue("Topic not auto created", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
-      // wait until leader is elected
-      val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
-      assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
-      try {
-        producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
-      } catch {
-        case e: FailedToSendMessageException => fail("Topic should have been auto created")
-        case oe: Throwable => fail("fails with exception", oe)
-      } finally {
-        producer.close()
-      }
-      servers.foreach(_.shutdown())
+  @Test
+  def testAutoCreateAfterDeleteTopic() {
+    val topicAndPartition = TopicAndPartition("test", 0)
+    val topic = topicAndPartition.topic
+    val servers = createTestTopicAndCluster(topic)
+    // start topic deletion
+    AdminUtils.deleteTopic(zkClient, topic)
+    verifyTopicDeletion(topic, servers)
+    // test if first produce request after topic deletion auto creates the topic
+    val props = new Properties()
+    props.put("metadata.broker.list", servers.map(s => s.config.hostName + ":" + s.config.port).mkString(","))
+    props.put("serializer.class", "kafka.serializer.StringEncoder")
+    props.put("producer.type", "sync")
+    props.put("request.required.acks", "1")
+    props.put("message.send.max.retries", "1")
+    val producerConfig = new ProducerConfig(props)
+    val producer = new Producer[String, String](producerConfig)
+    try {
+      producer.send(new KeyedMessage[String, String](topic, "test", "test1"))
+    } catch {
+      case e: FailedToSendMessageException => fail("Topic should have been auto created")
+      case oe: Throwable => fail("fails with exception", oe)
     }
+    // test the topic path exists
+    assertTrue("Topic not auto created", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
+    // wait until leader is elected
+    val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    assertTrue("New leader should be elected after re-creating topic test", leaderIdOpt.isDefined)
+    servers.foreach(_.shutdown())
+  }
 
-    @Test
-    def testDeleteNonExistingTopic() {
-      val topicAndPartition = TopicAndPartition("test", 0)
-      val topic = topicAndPartition.topic
-      val servers = createTestTopicAndCluster(topic)
-      // start topic deletion
-      AdminUtils.deleteTopic(zkClient, "test2")
-      // verify delete topic path for test2 is removed from zookeeper
-      verifyTopicDeletion("test2", servers)
-      // verify that topic test is untouched
-      assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
-        res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000))
-      // test the topic path exists
-      assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
-      // topic test should have a leader
-      val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
-      assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
-      servers.foreach(_.shutdown())
+  @Test
+  def testDeleteNonExistingTopic() {
+    val topicAndPartition = TopicAndPartition("test", 0)
+    val topic = topicAndPartition.topic
+    val servers = createTestTopicAndCluster(topic)
+    // start topic deletion
+    AdminUtils.deleteTopic(zkClient, "test2")
+    // verify delete topic path for test2 is removed from zookeeper
+    verifyTopicDeletion("test2", servers)
+    // verify that topic test is untouched
+    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
+      res && server.getLogManager().getLog(topicAndPartition).isDefined),
+      "Replicas for topic test not created")
+    // test the topic path exists
+    assertTrue("Topic test mistakenly deleted", ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)))
+    // topic test should have a leader
+    val leaderIdOpt = TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+    assertTrue("Leader should exist for topic test", leaderIdOpt.isDefined)
+    servers.foreach(_.shutdown())
 
-    }
+  }
 
   private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
-    val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
+    val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
     val topicAndPartition = TopicAndPartition(topic, 0)
     val brokerConfigs = TestUtils.createBrokerConfigs(3)
     brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
@@ -411,21 +321,21 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // wait until replica log is created on every broker
-    assertTrue("Replicas for topic test not created in 1000ms", TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
-      res && server.getLogManager().getLog(topicAndPartition).isDefined), 1000))
+    TestUtils.waitUntilTrue(() => servers.foldLeft(true)((res, server) =>
+      res && server.getLogManager().getLog(topicAndPartition).isDefined),
+      "Replicas for topic test not created")
     servers
   }
 
   private def verifyTopicDeletion(topic: String, servers: Seq[KafkaServer]) {
     val topicAndPartition = TopicAndPartition(topic, 0)
     // wait until admin path for delete topic is deleted, signaling completion of topic deletion
-    assertTrue("Admin path /admin/delete_topic/test path not deleted in 1000ms even after a replica is restarted",
-      TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)), 1000))
-    assertTrue("Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted",
-      TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)), 100))
+    TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getDeleteTopicPath(topic)),
+      "Admin path /admin/delete_topic/test path not deleted even after a replica is restarted")
+    TestUtils.waitUntilTrue(() => !ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic)),
+      "Topic path /brokers/topics/test not deleted after /admin/delete_topic/test path is deleted")
     // ensure that logs from all replicas are deleted if delete topic is marked successful in zookeeper
     assertTrue("Replica logs not deleted after delete topic is complete",
       servers.foldLeft(true)((res, server) => res && server.getLogManager().getLog(topicAndPartition).isEmpty))
   }
-  */
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 5c48796..ad12116 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -17,14 +17,12 @@
 package kafka.server
 
 import junit.framework.Assert._
-import java.util.Properties
-import java.io.File
-import org.junit.{After, Before, Test}
+import org.junit.Test
 import kafka.integration.KafkaServerTestHarness
 import kafka.utils._
 import kafka.common._
 import kafka.log.LogConfig
-import kafka.admin.AdminUtils
+import kafka.admin.{AdminOperationException, AdminUtils}
 import org.scalatest.junit.JUnit3Suite
 
 class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
@@ -48,4 +46,15 @@ class DynamicConfigChangeTest extends JUnit3Suite with KafkaServerTestHarness {
     }
   }
 
+  @Test
+  def testConfigChangeOnNonExistingTopic() {
+    val topic = TestUtils.tempTopic
+    try {
+      AdminUtils.changeTopicConfig(zkClient, topic, LogConfig(flushInterval = 10000).toProps)
+      fail("Should fail with AdminOperationException for topic doesn't exist")
+    } catch {
+      case e: AdminOperationException => // expected
+    }
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 014e964..ab60e9b 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -101,7 +101,6 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     verifyNonDaemonThreadsStatus
   }
 
-  /* Temporarily disable the test until delete topic is fixed.
   @Test
   def testCleanShutdownWithDeleteTopicEnabled() {
     val newProps = TestUtils.createBrokerConfig(0, port)
@@ -114,7 +113,6 @@ class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
     Utils.rm(server.config.logDirs)
     verifyNonDaemonThreadsStatus
   }
-  */
 
   def verifyNonDaemonThreadsStatus() {
     assertEquals(0, Thread.getAllStackTraces.keySet().toArray

http://git-wip-us.apache.org/repos/asf/kafka/blob/44c39c4e/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 00bfba4..034f361 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -87,6 +87,8 @@ object TestUtils extends Logging {
     f
   }
 
+  def tempTopic(): String = "testTopic" + random.nextInt(1000000)
+
   /**
    * Create a temporary relative directory
    */