You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/04 13:26:56 UTC

kafka git commit: KAFKA-2684; Add force option to topic / config command so they can be called programatically

Repository: kafka
Updated Branches:
  refs/heads/trunk 03a1f7d39 -> b410ea37b


KAFKA-2684; Add force option to topic / config command so they can be called programatically

Tiny change to add a force option to the topic and config commands so they can be called programatically without requiring user input.

Author: Ben Stopford <be...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #351 from benstopford/CPKAFKA-61B


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b410ea37
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b410ea37
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b410ea37

Branch: refs/heads/trunk
Commit: b410ea37b092740bf3e2a6e04caa3cf681ebc0d3
Parents: 03a1f7d
Author: Ben Stopford <be...@gmail.com>
Authored: Wed May 4 14:26:41 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 4 14:26:41 2016 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/ConfigCommand.scala | 12 +++++++-----
 core/src/main/scala/kafka/admin/TopicCommand.scala  | 13 +++++++++----
 2 files changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b410ea37/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 614e3fe..eaddd84 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -69,7 +69,7 @@ object ConfigCommand {
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
     val entityType = opts.options.valueOf(opts.entityType)
     val entityName = opts.options.valueOf(opts.entityName)
-    warnOnMaxMessagesChange(configsToBeAdded)
+    warnOnMaxMessagesChange(configsToBeAdded, opts.options.has(opts.forceOpt))
 
     // compile the final set of configs
     val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
@@ -85,14 +85,15 @@ object ConfigCommand {
     }
   }
 
-  def warnOnMaxMessagesChange(configs: Properties): Unit = {
+  def warnOnMaxMessagesChange(configs: Properties, force: Boolean): Unit = {
     val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match {
       case n: String => n.toInt
       case _ => -1
     }
     if (maxMessageBytes > Defaults.MaxMessageSize){
       error(TopicCommand.longMessageSizeWarning(maxMessageBytes))
-      TopicCommand.askToProceed
+      if (!force)
+        TopicCommand.askToProceed
     }
   }
 
@@ -107,14 +108,14 @@ object ConfigCommand {
     for (entityName <- entityNames) {
       val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
       println("Configs for %s:%s are %s"
-                      .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
+        .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
     }
   }
 
   private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
     val configsToBeAdded = opts.options.valuesOf(opts.addConfig).map(_.split("""\s*=\s*"""))
     require(configsToBeAdded.forall(config => config.length == 2),
-            "Invalid entity config: all configs to be added must be in the format \"key=val\".")
+      "Invalid entity config: all configs to be added must be in the format \"key=val\".")
     val props = new Properties
     configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
     if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
@@ -164,6 +165,7 @@ object ConfigCommand {
             .ofType(classOf[String])
             .withValuesSeparatedBy(',')
     val helpOpt = parser.accepts("help", "Print usage information.")
+    val forceOpt = parser.accepts("force", "Suppress console prompts")
     val options = parser.parse(args : _*)
 
     val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addConfig, deleteConfig, helpOpt)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b410ea37/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 9f1014f..029adea 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -97,13 +97,13 @@ object TopicCommand extends Logging {
     try {
       if (opts.options.has(opts.replicaAssignmentOpt)) {
         val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
-        warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length)
+        warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length, opts.options.has(opts.forceOpt))
         AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
       } else {
         CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
         val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
         val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
-        warnOnMaxMessagesChange(configs, replicas)
+        warnOnMaxMessagesChange(configs, replicas, opts.options.has(opts.forceOpt))
         val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
                             else RackAwareMode.Enforced
         AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
@@ -326,6 +326,9 @@ object TopicCommand extends Logging {
                                         "if set when creating topics, the action will only execute if the topic does not already exist")
 
     val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment")
+
+    val forceOpt = parser.accepts("force", "Suppress console prompts")
+
     val options = parser.parse(args : _*)
 
     val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
@@ -354,7 +357,7 @@ object TopicCommand extends Logging {
       CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt))
     }
   }
-  def warnOnMaxMessagesChange(configs: Properties, replicas: Integer): Unit = {
+  def warnOnMaxMessagesChange(configs: Properties, replicas: Integer, force: Boolean): Unit = {
     val maxMessageBytes =  configs.get(LogConfig.MaxMessageBytesProp) match {
       case n: String => n.toInt
       case _ => -1
@@ -362,7 +365,8 @@ object TopicCommand extends Logging {
     if (maxMessageBytes > Defaults.MaxMessageSize)
       if (replicas > 1) {
         error(longMessageSizeWarning(maxMessageBytes))
-        askToProceed
+        if (!force)
+          askToProceed
       }
       else
         warn(shortMessageSizeWarning(maxMessageBytes))
@@ -405,3 +409,4 @@ object TopicCommand extends Logging {
       s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n\n"
   }
 }
+