You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/14 23:28:30 UTC
kafka git commit: KAFKA-2536: topics tool should allow users to alter
topic configuration
Repository: kafka
Updated Branches:
refs/heads/trunk 5013a41a5 -> 362613347
KAFKA-2536: topics tool should allow users to alter topic configuration
This is a minimal revert of some backward incompatible changes made in KAFKA-2205, with the addition of the deprecation logging message.
Author: Grant Henke <gr...@gmail.com>
Reviewers: Gwen Shapira
Closes #305 from granthenke/topic-configs
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/36261334
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/36261334
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/36261334
Branch: refs/heads/trunk
Commit: 362613347371e9d74184e900ab80ba230940a5c8
Parents: 5013a41
Author: Grant Henke <gr...@gmail.com>
Authored: Wed Oct 14 14:28:14 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Oct 14 14:28:14 2015 -0700
----------------------------------------------------------------------
.../main/scala/kafka/admin/TopicCommand.scala | 60 ++++++++++++++------
1 file changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/36261334/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index f1405a5..3abac62 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -36,15 +36,15 @@ import kafka.coordinator.ConsumerCoordinator
object TopicCommand extends Logging {
def main(args: Array[String]): Unit = {
-
+
val opts = new TopicCommandOptions(args)
-
+
if(args.length == 0)
CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.")
-
+
// should have exactly one action
val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
- if(actions != 1)
+ if(actions != 1)
CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")
opts.checkArgs()
@@ -108,6 +108,20 @@ object TopicCommand extends Logging {
opts.options.valueOf(opts.zkConnectOpt)))
}
topics.foreach { topic =>
+ val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
+ if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
+ println("WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.")
+ println(" Going forward, please use kafka-configs.sh for this functionality")
+
+ val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
+ val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
+ // compile the final set of configs
+ configs.putAll(configsToBeAdded)
+ configsToBeDeleted.foreach(config => configs.remove(config))
+ AdminUtils.changeTopicConfig(zkClient, topic, configs)
+ println("Updated config for topic \"%s\".".format(topic))
+ }
+
if(opts.options.has(opts.partitionsOpt)) {
if (topic == ConsumerCoordinator.OffsetsTopicName) {
throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
@@ -121,7 +135,7 @@ object TopicCommand extends Logging {
}
}
}
-
+
def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) {
val topics = getTopics(zkClient, opts)
for(topic <- topics) {
@@ -211,6 +225,18 @@ object TopicCommand extends Logging {
props
}
+ def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = {
+ if (opts.options.has(opts.deleteConfigOpt)) {
+ val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim())
+ val propsToBeDeleted = new Properties
+ configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
+ LogConfig.validateNames(propsToBeDeleted)
+ configsToBeDeleted
+ }
+ else
+ Seq.empty
+ }
+
def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
val partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
@@ -225,7 +251,7 @@ object TopicCommand extends Logging {
}
ret.toMap
}
-
+
class TopicCommandOptions(args: Array[String]) {
val parser = new OptionParser
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
@@ -236,7 +262,7 @@ object TopicCommand extends Logging {
val listOpt = parser.accepts("list", "List all available topics.")
val createOpt = parser.accepts("create", "Create a new topic.")
val deleteOpt = parser.accepts("delete", "Delete a topic")
- val alterOpt = parser.accepts("alter", "Alter the number of partitions and/or replica assignment for a topic")
+ val alterOpt = parser.accepts("alter", "Alter the number of partitions, replica assignment, and/or configuration for the topic.")
val describeOpt = parser.accepts("describe", "List details for the given topics.")
val helpOpt = parser.accepts("help", "Print usage information.")
val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " +
@@ -245,12 +271,16 @@ object TopicCommand extends Logging {
.describedAs("topic")
.ofType(classOf[String])
val nl = System.getProperty("line.separator")
- val configOpt = parser.accepts("config", "A configuration override for the topic being created." +
- "The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
- "See the Kafka documentation for full details on the topic configs.")
- .withRequiredArg
- .describedAs("name=value")
- .ofType(classOf[String])
+ val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered." +
+ "The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
+ "See the Kafka documentation for full details on the topic configs.")
+ .withRequiredArg
+ .describedAs("name=value")
+ .ofType(classOf[String])
+ val deleteConfigOpt = parser.accepts("delete-config", "A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option).")
+ .withRequiredArg
+ .describedAs("name")
+ .ofType(classOf[String])
val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
"altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
.withRequiredArg
@@ -284,11 +314,10 @@ object TopicCommand extends Logging {
// check invalid args
CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
+ CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt))
CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt,alterOpt))
- // Topic configs cannot be changed with alterTopic
- CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(configOpt))
if(options.has(createOpt))
CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
@@ -299,5 +328,4 @@ object TopicCommand extends Logging {
allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt)
}
}
-
}