You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/03/19 01:31:43 UTC

git commit: KAFKA-1311 Add a flag to turn off delete topic until it is stable; reviewed by Joel Koshy and Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk 74e220925 -> 4484916d6


KAFKA-1311 Add a flag to turn off delete topic until it is stable; reviewed by Joel Koshy and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4484916d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4484916d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4484916d

Branch: refs/heads/trunk
Commit: 4484916d656243384d91e172a8e0b7ca240785f0
Parents: 74e2209
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue Mar 18 17:20:32 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Mar 18 17:31:35 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/admin/TopicCommand.scala   | 19 +----
 .../controller/PartitionStateMachine.scala      |  3 +-
 .../kafka/controller/TopicDeletionManager.scala | 78 +++++++++++++-------
 .../main/scala/kafka/server/KafkaConfig.scala   |  4 +
 .../unit/kafka/admin/DeleteTopicTest.scala      | 12 ++-
 5 files changed, 69 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4484916d/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 6fef9df..686a0df 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -36,9 +36,9 @@ object TopicCommand {
     val opts = new TopicCommandOptions(args)
     
     // should have exactly one action
-    val actions = Seq(opts.createOpt, opts.deleteOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _)
+    val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _)
     if(actions != 1) {
-      System.err.println("Command must include exactly one action: --list, --describe, --create, --delete, or --alter")
+      System.err.println("Command must include exactly one action: --list, --describe, --create or --alter")
       opts.parser.printHelpOn(System.err)
       System.exit(1)
     }
@@ -52,8 +52,6 @@ object TopicCommand {
         createTopic(zkClient, opts)
       else if(opts.options.has(opts.alterOpt))
         alterTopic(zkClient, opts)
-      else if(opts.options.has(opts.deleteOpt))
-        deleteTopic(zkClient, opts)
       else if(opts.options.has(opts.listOpt))
         listTopics(zkClient, opts)
       else if(opts.options.has(opts.describeOpt))
@@ -119,14 +117,6 @@ object TopicCommand {
     }
   }
   
-  def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
-    val topics = getTopics(zkClient, opts)
-    topics.foreach { topic =>
-      AdminUtils.deleteTopic(zkClient, topic)
-      println("Topic \"%s\" queued for deletion.".format(topic))
-    }
-  }
-  
   def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) {
     val topics = getTopics(zkClient, opts)
     for(topic <- topics)
@@ -221,10 +211,9 @@ object TopicCommand {
     val listOpt = parser.accepts("list", "List all available topics.")
     val createOpt = parser.accepts("create", "Create a new topic.")
     val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.")
-    val deleteOpt = parser.accepts("delete", "Delete the topic.")
     val describeOpt = parser.accepts("describe", "List details for the given topics.")
     val helpOpt = parser.accepts("help", "Print usage information.")
-    val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe. Can also accept a regular " +
+    val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " +
                                            "expression except for --create option")
                          .withRequiredArg
                          .describedAs("topic")
@@ -263,7 +252,7 @@ object TopicCommand {
 
     val options = parser.parse(args : _*)
 
-    val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, deleteOpt, describeOpt, listOpt)
+    val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt)
 
     def checkArgs() {
       // check required args

http://git-wip-us.apache.org/repos/asf/kafka/blob/4484916d/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index c69077e..c3e8d05 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -72,7 +72,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
   // register topic and partition change listeners
   def registerListeners() {
     registerTopicChangeListener()
-    registerDeleteTopicListener()
+    if(controller.config.deleteTopicEnable)
+      registerDeleteTopicListener()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/4484916d/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 58f1c42..488dfd0 100644
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -76,23 +76,28 @@ class TopicDeletionManager(controller: KafkaController,
   val deleteTopicsCond = controllerContext.controllerLock.newCondition()
   var deleteTopicStateChanged: Boolean = false
   var deleteTopicsThread: DeleteTopicsThread = null
+  val isDeleteTopicEnabled = controller.config.deleteTopicEnable
 
   /**
    * Invoked at the end of new controller initiation
    */
   def start() {
-    deleteTopicsThread = new DeleteTopicsThread()
-    deleteTopicStateChanged = true
-    deleteTopicsThread.start()
+    if(isDeleteTopicEnabled) {
+      deleteTopicsThread = new DeleteTopicsThread()
+      deleteTopicStateChanged = true
+      deleteTopicsThread.start()
+    }
   }
 
   /**
    * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared
    */
   def shutdown() {
-    deleteTopicsThread.shutdown()
-    topicsToBeDeleted.clear()
-    topicsIneligibleForDeletion.clear()
+    if(isDeleteTopicEnabled) {
+      deleteTopicsThread.shutdown()
+      topicsToBeDeleted.clear()
+      topicsIneligibleForDeletion.clear()
+    }
   }
 
   /**
@@ -102,8 +107,10 @@ class TopicDeletionManager(controller: KafkaController,
    * @param topics Topics that should be deleted
    */
   def enqueueTopicsForDeletion(topics: Set[String]) {
-    topicsToBeDeleted ++= topics
-    resumeTopicDeletionThread()
+    if(isDeleteTopicEnabled) {
+      topicsToBeDeleted ++= topics
+      resumeTopicDeletionThread()
+    }
   }
 
   /**
@@ -115,10 +122,12 @@ class TopicDeletionManager(controller: KafkaController,
    * @param topics Topics for which deletion can be resumed
    */
   def resumeDeletionForTopics(topics: Set[String] = Set.empty) {
-    val topicsToResumeDeletion = topics & topicsToBeDeleted
-    if(topicsToResumeDeletion.size > 0) {
-      topicsIneligibleForDeletion --= topicsToResumeDeletion
-      resumeTopicDeletionThread()
+    if(isDeleteTopicEnabled) {
+      val topicsToResumeDeletion = topics & topicsToBeDeleted
+      if(topicsToResumeDeletion.size > 0) {
+        topicsIneligibleForDeletion --= topicsToResumeDeletion
+        resumeTopicDeletionThread()
+      }
     }
   }
 
@@ -131,14 +140,16 @@ class TopicDeletionManager(controller: KafkaController,
    * @param replicas Replicas for which deletion has failed
    */
   def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {
-    val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
-    if(replicasThatFailedToDelete.size > 0) {
-      val topics = replicasThatFailedToDelete.map(_.topic)
-      debug("Deletion failed for replicas %s. Halting deletion for topics %s"
-        .format(replicasThatFailedToDelete.mkString(","), topics))
-      controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible)
-      markTopicIneligibleForDeletion(topics)
-      resumeTopicDeletionThread()
+    if(isDeleteTopicEnabled) {
+      val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
+      if(replicasThatFailedToDelete.size > 0) {
+        val topics = replicasThatFailedToDelete.map(_.topic)
+        debug("Deletion failed for replicas %s. Halting deletion for topics %s"
+          .format(replicasThatFailedToDelete.mkString(","), topics))
+        controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible)
+        markTopicIneligibleForDeletion(topics)
+        resumeTopicDeletionThread()
+      }
     }
   }
 
@@ -150,22 +161,33 @@ class TopicDeletionManager(controller: KafkaController,
    * @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion
    */
   def markTopicIneligibleForDeletion(topics: Set[String]) {
-    val newTopicsToHaltDeletion = topicsToBeDeleted & topics
-    topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
-    if(newTopicsToHaltDeletion.size > 0)
-      info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
+    if(isDeleteTopicEnabled) {
+      val newTopicsToHaltDeletion = topicsToBeDeleted & topics
+      topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
+      if(newTopicsToHaltDeletion.size > 0)
+        info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
+    }
   }
 
   def isTopicIneligibleForDeletion(topic: String): Boolean = {
-    topicsIneligibleForDeletion.contains(topic)
+    if(isDeleteTopicEnabled) {
+      topicsIneligibleForDeletion.contains(topic)
+    } else
+      true
   }
 
   def isTopicDeletionInProgress(topic: String): Boolean = {
-    controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)
+    if(isDeleteTopicEnabled) {
+      controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)
+    } else
+      false
   }
 
   def isTopicQueuedUpForDeletion(topic: String): Boolean = {
-    topicsToBeDeleted.contains(topic)
+    if(isDeleteTopicEnabled) {
+      topicsToBeDeleted.contains(topic)
+    } else
+      false
   }
 
   /**
@@ -336,7 +358,7 @@ class TopicDeletionManager(controller: KafkaController,
         if(topicsQueuedForDeletion.size > 0)
           info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
         topicsQueuedForDeletion.foreach { topic =>
-          // if all replicas are marked as deleted successfully, then topic deletion is done
+        // if all replicas are marked as deleted successfully, then topic deletion is done
           if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
             // clear up all state for this topic from controller cache and zookeeper
             completeDeleteTopic(topic)

http://git-wip-us.apache.org/repos/asf/kafka/blob/4484916d/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 08de0ef..b0506d4 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -287,4 +287,8 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /** The required acks before the commit can be accepted. In general, the default (-1) should not be overridden. */
   val offsetCommitRequiredAcks = props.getShortInRange("offsets.commit.required.acks",
     OffsetManagerConfig.DefaultOffsetCommitRequiredAcks, (-1, offsetsTopicReplicationFactor))
+
+  /* Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off */
+  val deleteTopicEnable = props.getBoolean("delete.topic.enable", false)
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4484916d/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 6db76a5..e704290 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -212,8 +212,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     val topicAndPartition = TopicAndPartition(topic, 0)
+    val brokerConfigs = TestUtils.createBrokerConfigs(4)
+    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
     // create brokers
-    val allServers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
     val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
@@ -252,8 +254,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topic = "test"
     val topicAndPartition = TopicAndPartition(topic, 0)
+    val brokerConfigs = TestUtils.createBrokerConfigs(4)
+    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
     // create brokers
-    val allServers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
     val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
@@ -421,8 +425,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
   private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
     val expectedReplicaAssignment = Map(0  -> List(0, 1, 2))
     val topicAndPartition = TopicAndPartition(topic, 0)
+    val brokerConfigs = TestUtils.createBrokerConfigs(3)
+    brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
     // create brokers
-    val servers = TestUtils.createBrokerConfigs(3).map(b => TestUtils.createServer(new KafkaConfig(b)))
+    val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
     // create the topic
     AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
     // wait until replica log is created on every broker