You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/05/04 13:26:56 UTC
kafka git commit: KAFKA-2684;
Add force option to topic / config command so they can be called
programatically
Repository: kafka
Updated Branches:
refs/heads/trunk 03a1f7d39 -> b410ea37b
KAFKA-2684; Add force option to topic / config command so they can be called programatically
Tiny change to add a force option to the topic and config commands so they can be called programatically without requiring user input.
Author: Ben Stopford <be...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #351 from benstopford/CPKAFKA-61B
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b410ea37
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b410ea37
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b410ea37
Branch: refs/heads/trunk
Commit: b410ea37b092740bf3e2a6e04caa3cf681ebc0d3
Parents: 03a1f7d
Author: Ben Stopford <be...@gmail.com>
Authored: Wed May 4 14:26:41 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed May 4 14:26:41 2016 +0100
----------------------------------------------------------------------
core/src/main/scala/kafka/admin/ConfigCommand.scala | 12 +++++++-----
core/src/main/scala/kafka/admin/TopicCommand.scala | 13 +++++++++----
2 files changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b410ea37/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 614e3fe..eaddd84 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -69,7 +69,7 @@ object ConfigCommand {
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
val entityType = opts.options.valueOf(opts.entityType)
val entityName = opts.options.valueOf(opts.entityName)
- warnOnMaxMessagesChange(configsToBeAdded)
+ warnOnMaxMessagesChange(configsToBeAdded, opts.options.has(opts.forceOpt))
// compile the final set of configs
val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
@@ -85,14 +85,15 @@ object ConfigCommand {
}
}
- def warnOnMaxMessagesChange(configs: Properties): Unit = {
+ def warnOnMaxMessagesChange(configs: Properties, force: Boolean): Unit = {
val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match {
case n: String => n.toInt
case _ => -1
}
if (maxMessageBytes > Defaults.MaxMessageSize){
error(TopicCommand.longMessageSizeWarning(maxMessageBytes))
- TopicCommand.askToProceed
+ if (!force)
+ TopicCommand.askToProceed
}
}
@@ -107,14 +108,14 @@ object ConfigCommand {
for (entityName <- entityNames) {
val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, entityName)
println("Configs for %s:%s are %s"
- .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
+ .format(entityType, entityName, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
}
}
private[admin] def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
val configsToBeAdded = opts.options.valuesOf(opts.addConfig).map(_.split("""\s*=\s*"""))
require(configsToBeAdded.forall(config => config.length == 2),
- "Invalid entity config: all configs to be added must be in the format \"key=val\".")
+ "Invalid entity config: all configs to be added must be in the format \"key=val\".")
val props = new Properties
configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).trim))
if (props.containsKey(LogConfig.MessageFormatVersionProp)) {
@@ -164,6 +165,7 @@ object ConfigCommand {
.ofType(classOf[String])
.withValuesSeparatedBy(',')
val helpOpt = parser.accepts("help", "Print usage information.")
+ val forceOpt = parser.accepts("force", "Suppress console prompts")
val options = parser.parse(args : _*)
val allOpts: Set[OptionSpec[_]] = Set(alterOpt, describeOpt, entityType, entityName, addConfig, deleteConfig, helpOpt)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b410ea37/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index 9f1014f..029adea 100755
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -97,13 +97,13 @@ object TopicCommand extends Logging {
try {
if (opts.options.has(opts.replicaAssignmentOpt)) {
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
- warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length)
+ warnOnMaxMessagesChange(configs, assignment.valuesIterator.next().length, opts.options.has(opts.forceOpt))
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
} else {
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
- warnOnMaxMessagesChange(configs, replicas)
+ warnOnMaxMessagesChange(configs, replicas, opts.options.has(opts.forceOpt))
val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
else RackAwareMode.Enforced
AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
@@ -326,6 +326,9 @@ object TopicCommand extends Logging {
"if set when creating topics, the action will only execute if the topic does not already exist")
val disableRackAware = parser.accepts("disable-rack-aware", "Disable rack aware replica assignment")
+
+ val forceOpt = parser.accepts("force", "Suppress console prompts")
+
val options = parser.parse(args : _*)
val allTopicLevelOpts: Set[OptionSpec[_]] = Set(alterOpt, createOpt, describeOpt, listOpt, deleteOpt)
@@ -354,7 +357,7 @@ object TopicCommand extends Logging {
CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt))
}
}
- def warnOnMaxMessagesChange(configs: Properties, replicas: Integer): Unit = {
+ def warnOnMaxMessagesChange(configs: Properties, replicas: Integer, force: Boolean): Unit = {
val maxMessageBytes = configs.get(LogConfig.MaxMessageBytesProp) match {
case n: String => n.toInt
case _ => -1
@@ -362,7 +365,8 @@ object TopicCommand extends Logging {
if (maxMessageBytes > Defaults.MaxMessageSize)
if (replicas > 1) {
error(longMessageSizeWarning(maxMessageBytes))
- askToProceed
+ if (!force)
+ askToProceed
}
else
warn(shortMessageSizeWarning(maxMessageBytes))
@@ -405,3 +409,4 @@ object TopicCommand extends Logging {
s"- Default Consumer fetch.message.max.bytes: ${ConsumerConfig.FetchSize}\n\n"
}
}
+