You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/14 23:28:30 UTC

kafka git commit: KAFKA-2536: topics tool should allow users to alter topic configuration

Repository: kafka
Updated Branches:
  refs/heads/trunk 5013a41a5 -> 362613347


KAFKA-2536: topics tool should allow users to alter topic configuration

This is a minimal revert of some backward incompatible changes made in KAFKA-2205, with the addition of the deprecation logging message.

Author: Grant Henke <gr...@gmail.com>

Reviewers: Gwen Shapira

Closes #305 from granthenke/topic-configs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/36261334
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/36261334
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/36261334

Branch: refs/heads/trunk
Commit: 362613347371e9d74184e900ab80ba230940a5c8
Parents: 5013a41
Author: Grant Henke <gr...@gmail.com>
Authored: Wed Oct 14 14:28:14 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Wed Oct 14 14:28:14 2015 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/admin/TopicCommand.scala   | 60 ++++++++++++++------
 1 file changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/36261334/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index f1405a5..3abac62 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -36,15 +36,15 @@ import kafka.coordinator.ConsumerCoordinator
 object TopicCommand extends Logging {
 
   def main(args: Array[String]): Unit = {
-    
+
     val opts = new TopicCommandOptions(args)
-    
+
     if(args.length == 0)
       CommandLineUtils.printUsageAndDie(opts.parser, "Create, delete, describe, or change a topic.")
-    
+
     // should have exactly one action
     val actions = Seq(opts.createOpt, opts.listOpt, opts.alterOpt, opts.describeOpt, opts.deleteOpt).count(opts.options.has _)
-    if(actions != 1) 
+    if(actions != 1)
       CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --list, --describe, --create, --alter or --delete")
 
     opts.checkArgs()
@@ -108,6 +108,20 @@ object TopicCommand extends Logging {
           opts.options.valueOf(opts.zkConnectOpt)))
     }
     topics.foreach { topic =>
+      val configs = AdminUtils.fetchEntityConfig(zkClient, ConfigType.Topic, topic)
+      if(opts.options.has(opts.configOpt) || opts.options.has(opts.deleteConfigOpt)) {
+        println("WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.")
+        println("         Going forward, please use kafka-configs.sh for this functionality")
+
+        val configsToBeAdded = parseTopicConfigsToBeAdded(opts)
+        val configsToBeDeleted = parseTopicConfigsToBeDeleted(opts)
+        // compile the final set of configs
+        configs.putAll(configsToBeAdded)
+        configsToBeDeleted.foreach(config => configs.remove(config))
+        AdminUtils.changeTopicConfig(zkClient, topic, configs)
+        println("Updated config for topic \"%s\".".format(topic))
+      }
+
       if(opts.options.has(opts.partitionsOpt)) {
         if (topic == ConsumerCoordinator.OffsetsTopicName) {
           throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
@@ -121,7 +135,7 @@ object TopicCommand extends Logging {
       }
     }
   }
-  
+
   def listTopics(zkClient: ZkClient, opts: TopicCommandOptions) {
     val topics = getTopics(zkClient, opts)
     for(topic <- topics) {
@@ -211,6 +225,18 @@ object TopicCommand extends Logging {
     props
   }
 
+  def parseTopicConfigsToBeDeleted(opts: TopicCommandOptions): Seq[String] = {
+    if (opts.options.has(opts.deleteConfigOpt)) {
+      val configsToBeDeleted = opts.options.valuesOf(opts.deleteConfigOpt).map(_.trim())
+      val propsToBeDeleted = new Properties
+      configsToBeDeleted.foreach(propsToBeDeleted.setProperty(_, ""))
+      LogConfig.validateNames(propsToBeDeleted)
+      configsToBeDeleted
+    }
+    else
+      Seq.empty
+  }
+
   def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
     val partitionList = replicaAssignmentList.split(",")
     val ret = new mutable.HashMap[Int, List[Int]]()
@@ -225,7 +251,7 @@ object TopicCommand extends Logging {
     }
     ret.toMap
   }
-  
+
   class TopicCommandOptions(args: Array[String]) {
     val parser = new OptionParser
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
@@ -236,7 +262,7 @@ object TopicCommand extends Logging {
     val listOpt = parser.accepts("list", "List all available topics.")
     val createOpt = parser.accepts("create", "Create a new topic.")
     val deleteOpt = parser.accepts("delete", "Delete a topic")
-    val alterOpt = parser.accepts("alter", "Alter the number of partitions and/or replica assignment for a topic")
+    val alterOpt = parser.accepts("alter", "Alter the number of partitions, replica assignment, and/or configuration for the topic.")
     val describeOpt = parser.accepts("describe", "List details for the given topics.")
     val helpOpt = parser.accepts("help", "Print usage information.")
     val topicOpt = parser.accepts("topic", "The topic to be create, alter or describe. Can also accept a regular " +
@@ -245,12 +271,16 @@ object TopicCommand extends Logging {
                          .describedAs("topic")
                          .ofType(classOf[String])
     val nl = System.getProperty("line.separator")
-    val configOpt = parser.accepts("config", "A configuration override for the topic being created."  +
-                                                         "The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
-                                                         "See the Kafka documentation for full details on the topic configs.")
-                          .withRequiredArg
-                          .describedAs("name=value")
-                          .ofType(classOf[String])
+    val configOpt = parser.accepts("config", "A topic configuration override for the topic being created or altered."  +
+                                             "The following is a list of valid configurations: " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
+                                             "See the Kafka documentation for full details on the topic configs.")
+                           .withRequiredArg
+                           .describedAs("name=value")
+                           .ofType(classOf[String])
+    val deleteConfigOpt = parser.accepts("delete-config", "A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option).")
+                           .withRequiredArg
+                           .describedAs("name")
+                           .ofType(classOf[String])
     val partitionsOpt = parser.accepts("partitions", "The number of partitions for the topic being created or " +
       "altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected")
                            .withRequiredArg
@@ -284,11 +314,10 @@ object TopicCommand extends Logging {
 
       // check invalid args
       CommandLineUtils.checkInvalidArgs(parser, options, configOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
+      CommandLineUtils.checkInvalidArgs(parser, options, deleteConfigOpt, allTopicLevelOpts -- Set(alterOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, partitionsOpt, allTopicLevelOpts -- Set(alterOpt, createOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, replicationFactorOpt, allTopicLevelOpts -- Set(createOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, allTopicLevelOpts -- Set(createOpt,alterOpt))
-      // Topic configs cannot be changed with alterTopic
-      CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(configOpt))
       if(options.has(createOpt))
           CommandLineUtils.checkInvalidArgs(parser, options, replicaAssignmentOpt, Set(partitionsOpt, replicationFactorOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, reportUnderReplicatedPartitionsOpt,
@@ -299,5 +328,4 @@ object TopicCommand extends Logging {
         allTopicLevelOpts -- Set(describeOpt) + reportUnderReplicatedPartitionsOpt + reportUnavailablePartitionsOpt)
     }
   }
-  
 }