You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/12 05:31:27 UTC

git commit: kafka-1363; testTopicConfigChangesDuringDeleteTopic hangs; patched by Timothy Chen; reviewed by Guozhang Wang, Neha Narkhede and Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 2d429e19d -> a3a2cba84


kafka-1363; testTopicConfigChangesDuringDeleteTopic hangs; patched by Timothy Chen; reviewed by Guozhang Wang, Neha Narkhede and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a3a2cba8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a3a2cba8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a3a2cba8

Branch: refs/heads/trunk
Commit: a3a2cba842a9945d4ce7b032e311d17956c33249
Parents: 2d429e1
Author: Timothy Chen <tn...@gmail.com>
Authored: Fri Apr 11 20:31:13 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Apr 11 20:31:13 2014 -0700

----------------------------------------------------------------------
 .../kafka/controller/TopicDeletionManager.scala     | 16 +++++++++++++---
 1 file changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a3a2cba8/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index d29e556..e4bc243 100644
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -81,12 +81,14 @@ class TopicDeletionManager(controller: KafkaController,
   val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
   var deleteTopicsThread: DeleteTopicsThread = null
   val isDeleteTopicEnabled = controller.config.deleteTopicEnable
+  @volatile var isShuttingDown = false
 
   /**
    * Invoked at the end of new controller initiation
    */
   def start() {
     if(isDeleteTopicEnabled) {
+      isShuttingDown = false
       deleteTopicsThread = new DeleteTopicsThread()
       deleteTopicStateChanged.set(true)
       deleteTopicsThread.start()
@@ -98,10 +100,13 @@ class TopicDeletionManager(controller: KafkaController,
    */
   def shutdown() {
     if(isDeleteTopicEnabled) {
+      isShuttingDown = true
+      resumeTopicDeletionThread()
       deleteTopicsThread.shutdown()
       topicsToBeDeleted.clear()
       partitionsToBeDeleted.clear()
       topicsIneligibleForDeletion.clear()
+      isShuttingDown = false
     }
   }
 
@@ -202,7 +207,7 @@ class TopicDeletionManager(controller: KafkaController,
    */
   private def awaitTopicDeletionNotification() {
     inLock(deleteLock) {
-      while(!deleteTopicStateChanged.compareAndSet(true, false)) {
+      while(!isShuttingDown && !deleteTopicStateChanged.compareAndSet(true, false)) {
         info("Waiting for signal to start or continue topic deletion")
         deleteTopicsCond.await()
       }
@@ -360,15 +365,20 @@ class TopicDeletionManager(controller: KafkaController,
     }
   }
 
-  class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") {
+  class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread", isInterruptible = false) {
     val zkClient = controllerContext.zkClient
     override def doWork() {
       awaitTopicDeletionNotification()
 
+      if(!isRunning.get)
+        return
+
       inLock(controllerContext.controllerLock) {
         val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
-        if(topicsQueuedForDeletion.size > 0)
+
+        if(!topicsQueuedForDeletion.isEmpty)
           info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
+
         topicsQueuedForDeletion.foreach { topic =>
         // if all replicas are marked as deleted successfully, then topic deletion is done
           if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {