You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/04/12 05:31:27 UTC
git commit: kafka-1363; testTopicConfigChangesDuringDeleteTopic hangs;
patched by Timothy Chen; reviewed by Guozhang Wang, Neha Narkhede and Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 2d429e19d -> a3a2cba84
kafka-1363; testTopicConfigChangesDuringDeleteTopic hangs; patched by Timothy Chen; reviewed by Guozhang Wang, Neha Narkhede and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a3a2cba8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a3a2cba8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a3a2cba8
Branch: refs/heads/trunk
Commit: a3a2cba842a9945d4ce7b032e311d17956c33249
Parents: 2d429e1
Author: Timothy Chen <tn...@gmail.com>
Authored: Fri Apr 11 20:31:13 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Apr 11 20:31:13 2014 -0700
----------------------------------------------------------------------
.../kafka/controller/TopicDeletionManager.scala | 16 +++++++++++++---
1 file changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/a3a2cba8/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index d29e556..e4bc243 100644
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -81,12 +81,14 @@ class TopicDeletionManager(controller: KafkaController,
val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
var deleteTopicsThread: DeleteTopicsThread = null
val isDeleteTopicEnabled = controller.config.deleteTopicEnable
+ @volatile var isShuttingDown = false
/**
* Invoked at the end of new controller initiation
*/
def start() {
if(isDeleteTopicEnabled) {
+ isShuttingDown = false
deleteTopicsThread = new DeleteTopicsThread()
deleteTopicStateChanged.set(true)
deleteTopicsThread.start()
@@ -98,10 +100,13 @@ class TopicDeletionManager(controller: KafkaController,
*/
def shutdown() {
if(isDeleteTopicEnabled) {
+ isShuttingDown = true
+ resumeTopicDeletionThread()
deleteTopicsThread.shutdown()
topicsToBeDeleted.clear()
partitionsToBeDeleted.clear()
topicsIneligibleForDeletion.clear()
+ isShuttingDown = false
}
}
@@ -202,7 +207,7 @@ class TopicDeletionManager(controller: KafkaController,
*/
private def awaitTopicDeletionNotification() {
inLock(deleteLock) {
- while(!deleteTopicStateChanged.compareAndSet(true, false)) {
+ while(!isShuttingDown && !deleteTopicStateChanged.compareAndSet(true, false)) {
info("Waiting for signal to start or continue topic deletion")
deleteTopicsCond.await()
}
@@ -360,15 +365,20 @@ class TopicDeletionManager(controller: KafkaController,
}
}
- class DeleteTopicsThread() extends ShutdownableThread("delete-topics-thread") {
+ class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread", isInterruptible = false) {
val zkClient = controllerContext.zkClient
override def doWork() {
awaitTopicDeletionNotification()
+ if(!isRunning.get)
+ return
+
inLock(controllerContext.controllerLock) {
val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
- if(topicsQueuedForDeletion.size > 0)
+
+ if(!topicsQueuedForDeletion.isEmpty)
info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
+
topicsQueuedForDeletion.foreach { topic =>
// if all replicas are marked as deleted successfully, then topic deletion is done
if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {