You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/03/19 01:31:43 UTC
git commit: KAFKA-1311 Add a flag to turn off delete topic until it
is stable; reviewed by Joel Koshy and Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk 74e220925 -> 4484916d6
KAFKA-1311 Add a flag to turn off delete topic until it is stable; reviewed by Joel Koshy and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4484916d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4484916d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4484916d
Branch: refs/heads/trunk
Commit: 4484916d656243384d91e172a8e0b7ca240785f0
Parents: 74e2209
Author: Neha Narkhede <ne...@gmail.com>
Authored: Tue Mar 18 17:20:32 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue Mar 18 17:31:35 2014 -0700
----------------------------------------------------------------------
.../main/scala/kafka/admin/TopicCommand.scala | 19 +----
.../controller/PartitionStateMachine.scala | 3 +-
.../kafka/controller/TopicDeletionManager.scala | 78 +++++++++++++-------
.../main/scala/kafka/server/KafkaConfig.scala | 4 +
.../unit/kafka/admin/DeleteTopicTest.scala | 12 ++-
5 files changed, 69 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4484916d/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 6fef9df..686a0df 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -36,9 +36,9 @@ object TopicCommand {
val opts = new TopicCommandOptions(args)
// should have exactly one action
- val actions = Seq(opts.createOpt, opts.deleteOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _)
+ val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt).count(opts.options.has _)
if(actions != 1) {
- System.err.println("Command must include exactly one action: --list, --describe, --create, --delete, or --alter")
+ System.err.println("Command must include exactly one action: --list, --describe, --create or --alter")
opts.parser.printHelpOn(System.err)
System.exit(1)
}
@@ -52,8 +52,6 @@ object TopicCommand {
createTopic(zkClient, opts)
else if(opts.options.has(opts.alterOpt))
alterTopic(zkClient, opts)
- else if(opts.options.has(opts.deleteOpt))
- deleteTopic(zkClient, opts)
else if(opts.options.has(opts.listOpt))
listTopics(zkClient, opts)
else if(opts.options.has(opts.describeOpt))
@@ -119,14 +117,6 @@ object TopicCommand {
}
}
- def deleteTopic(zkClient: ZkClient, opts: TopicCommandOptions) {
- val topics = getTopics(zkClient, opts)
- topics.foreach { topic =>
- AdminUtils.deleteTopic(zkClient, topic)
- println("Topic \"%s\" queued for deletion.".format(topic))
- }
- }
-
def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) {
val topics = getTopics(zkClient, opts)
for(topic <- topics)
@@ -221,10 +211,9 @@ object TopicCommand {
val listOpt = parser.accepts("list", "List all available topics.")
val createOpt = parser.accepts("create", "Create a new topic.")
val alterOpt = parser.accepts("alter", "Alter the configuration for the topic.")
- val deleteOpt = parser.accepts("delete", "Delete the topic.")
val describeOpt = parser.accepts("describe", "List details for the given topics.")
val helpOpt = parser.accepts("help", "Print usage information.")
- val topicOpt = parser.accepts("topic", "The topic to be create, alter, delete, or describe. Can also accept a regular " +
+ val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " +
"expression except for --create option")
.withRequiredArg
.describedAs("topic")
@@ -263,7 +252,7 @@ object TopicCommand {
val options = parser.parse(args : _*)
- val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, deleteOpt, describeOpt, listOpt)
+ val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt)
def checkArgs() {
// check required args
http://git-wip-us.apache.org/repos/asf/kafka/blob/4484916d/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index c69077e..c3e8d05 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -72,7 +72,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// register topic and partition change listeners
def registerListeners() {
registerTopicChangeListener()
- registerDeleteTopicListener()
+ if(controller.config.deleteTopicEnable)
+ registerDeleteTopicListener()
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/4484916d/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index 58f1c42..488dfd0 100644
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -76,23 +76,28 @@ class TopicDeletionManager(controller: KafkaController,
val deleteTopicsCond = controllerContext.controllerLock.newCondition()
var deleteTopicStateChanged: Boolean = false
var deleteTopicsThread: DeleteTopicsThread = null
+ val isDeleteTopicEnabled = controller.config.deleteTopicEnable
/**
* Invoked at the end of new controller initiation
*/
def start() {
- deleteTopicsThread = new DeleteTopicsThread()
- deleteTopicStateChanged = true
- deleteTopicsThread.start()
+ if(isDeleteTopicEnabled) {
+ deleteTopicsThread = new DeleteTopicsThread()
+ deleteTopicStateChanged = true
+ deleteTopicsThread.start()
+ }
}
/**
* Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared
*/
def shutdown() {
- deleteTopicsThread.shutdown()
- topicsToBeDeleted.clear()
- topicsIneligibleForDeletion.clear()
+ if(isDeleteTopicEnabled) {
+ deleteTopicsThread.shutdown()
+ topicsToBeDeleted.clear()
+ topicsIneligibleForDeletion.clear()
+ }
}
/**
@@ -102,8 +107,10 @@ class TopicDeletionManager(controller: KafkaController,
* @param topics Topics that should be deleted
*/
def enqueueTopicsForDeletion(topics: Set[String]) {
- topicsToBeDeleted ++= topics
- resumeTopicDeletionThread()
+ if(isDeleteTopicEnabled) {
+ topicsToBeDeleted ++= topics
+ resumeTopicDeletionThread()
+ }
}
/**
@@ -115,10 +122,12 @@ class TopicDeletionManager(controller: KafkaController,
* @param topics Topics for which deletion can be resumed
*/
def resumeDeletionForTopics(topics: Set[String] = Set.empty) {
- val topicsToResumeDeletion = topics & topicsToBeDeleted
- if(topicsToResumeDeletion.size > 0) {
- topicsIneligibleForDeletion --= topicsToResumeDeletion
- resumeTopicDeletionThread()
+ if(isDeleteTopicEnabled) {
+ val topicsToResumeDeletion = topics & topicsToBeDeleted
+ if(topicsToResumeDeletion.size > 0) {
+ topicsIneligibleForDeletion --= topicsToResumeDeletion
+ resumeTopicDeletionThread()
+ }
}
}
@@ -131,14 +140,16 @@ class TopicDeletionManager(controller: KafkaController,
* @param replicas Replicas for which deletion has failed
*/
def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {
- val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
- if(replicasThatFailedToDelete.size > 0) {
- val topics = replicasThatFailedToDelete.map(_.topic)
- debug("Deletion failed for replicas %s. Halting deletion for topics %s"
- .format(replicasThatFailedToDelete.mkString(","), topics))
- controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible)
- markTopicIneligibleForDeletion(topics)
- resumeTopicDeletionThread()
+ if(isDeleteTopicEnabled) {
+ val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
+ if(replicasThatFailedToDelete.size > 0) {
+ val topics = replicasThatFailedToDelete.map(_.topic)
+ debug("Deletion failed for replicas %s. Halting deletion for topics %s"
+ .format(replicasThatFailedToDelete.mkString(","), topics))
+ controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible)
+ markTopicIneligibleForDeletion(topics)
+ resumeTopicDeletionThread()
+ }
}
}
@@ -150,22 +161,33 @@ class TopicDeletionManager(controller: KafkaController,
* @param topics Topics that should be marked ineligible for deletion. No op if the topic is was not previously queued up for deletion
*/
def markTopicIneligibleForDeletion(topics: Set[String]) {
- val newTopicsToHaltDeletion = topicsToBeDeleted & topics
- topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
- if(newTopicsToHaltDeletion.size > 0)
- info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
+ if(isDeleteTopicEnabled) {
+ val newTopicsToHaltDeletion = topicsToBeDeleted & topics
+ topicsIneligibleForDeletion ++= newTopicsToHaltDeletion
+ if(newTopicsToHaltDeletion.size > 0)
+ info("Halted deletion of topics %s".format(newTopicsToHaltDeletion.mkString(",")))
+ }
}
def isTopicIneligibleForDeletion(topic: String): Boolean = {
- topicsIneligibleForDeletion.contains(topic)
+ if(isDeleteTopicEnabled) {
+ topicsIneligibleForDeletion.contains(topic)
+ } else
+ true
}
def isTopicDeletionInProgress(topic: String): Boolean = {
- controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)
+ if(isDeleteTopicEnabled) {
+ controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)
+ } else
+ false
}
def isTopicQueuedUpForDeletion(topic: String): Boolean = {
- topicsToBeDeleted.contains(topic)
+ if(isDeleteTopicEnabled) {
+ topicsToBeDeleted.contains(topic)
+ } else
+ false
}
/**
@@ -336,7 +358,7 @@ class TopicDeletionManager(controller: KafkaController,
if(topicsQueuedForDeletion.size > 0)
info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))
topicsQueuedForDeletion.foreach { topic =>
- // if all replicas are marked as deleted successfully, then topic deletion is done
+ // if all replicas are marked as deleted successfully, then topic deletion is done
if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
// clear up all state for this topic from controller cache and zookeeper
completeDeleteTopic(topic)
http://git-wip-us.apache.org/repos/asf/kafka/blob/4484916d/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 08de0ef..b0506d4 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -287,4 +287,8 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/** The required acks before the commit can be accepted. In general, the default (-1) should not be overridden. */
val offsetCommitRequiredAcks = props.getShortInRange("offsets.commit.required.acks",
OffsetManagerConfig.DefaultOffsetCommitRequiredAcks, (-1, offsetsTopicReplicationFactor))
+
+ /* Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off */
+ val deleteTopicEnable = props.getBoolean("delete.topic.enable", false)
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4484916d/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 6db76a5..e704290 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -212,8 +212,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topic = "test"
val topicAndPartition = TopicAndPartition(topic, 0)
+ val brokerConfigs = TestUtils.createBrokerConfigs(4)
+ brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
// create brokers
- val allServers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
+ val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
@@ -252,8 +254,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topic = "test"
val topicAndPartition = TopicAndPartition(topic, 0)
+ val brokerConfigs = TestUtils.createBrokerConfigs(4)
+ brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
// create brokers
- val allServers = TestUtils.createBrokerConfigs(4).map(b => TestUtils.createServer(new KafkaConfig(b)))
+ val allServers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
val servers = allServers.filter(s => expectedReplicaAssignment(0).contains(s.config.brokerId))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
@@ -421,8 +425,10 @@ class DeleteTopicTest extends JUnit3Suite with ZooKeeperTestHarness {
private def createTestTopicAndCluster(topic: String): Seq[KafkaServer] = {
val expectedReplicaAssignment = Map(0 -> List(0, 1, 2))
val topicAndPartition = TopicAndPartition(topic, 0)
+ val brokerConfigs = TestUtils.createBrokerConfigs(3)
+ brokerConfigs.foreach(p => p.setProperty("delete.topic.enable", "true"))
// create brokers
- val servers = TestUtils.createBrokerConfigs(3).map(b => TestUtils.createServer(new KafkaConfig(b)))
+ val servers = brokerConfigs.map(b => TestUtils.createServer(new KafkaConfig(b)))
// create the topic
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkClient, topic, expectedReplicaAssignment)
// wait until replica log is created on every broker