You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/06/07 06:58:52 UTC
kafka git commit: KAFKA-3748: Add consumer-property to console tools
consumer
Repository: kafka
Updated Branches:
refs/heads/trunk d9f052acc -> 06a57cf19
KAFKA-3748: Add consumer-property to console tools consumer
ijuma harshach edoardocomar Can you please review the changes.
edoardocomar I have addressed your comment of extra space.
Author: Bharat Viswanadham <bh...@us.ibm.com>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1474 from bharatviswa504/Kafka-3748
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/06a57cf1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/06a57cf1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/06a57cf1
Branch: refs/heads/trunk
Commit: 06a57cf19c45e82245ada886ea885087ce60ab80
Parents: d9f052a
Author: Bharat Viswanadham <bh...@us.ibm.com>
Authored: Mon Jun 6 23:58:29 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Jun 6 23:58:29 2016 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/06a57cf1/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 3b7a214..c1b5aee 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -167,6 +167,7 @@ object ConsoleConsumer extends Logging {
val props = new Properties
props.putAll(config.consumerProps)
+ props.putAll(config.extraConsumerProps)
props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest")
props.put("zookeeper.connect", config.zkConnectionStr)
@@ -189,6 +190,7 @@ object ConsoleConsumer extends Logging {
val props = new Properties
props.putAll(config.consumerProps)
+ props.putAll(config.extraConsumerProps)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest")
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, if (config.keyDeserializer != null) config.keyDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer")
@@ -216,7 +218,11 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
- val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
+ val consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
+ .withRequiredArg
+ .describedAs("consumer_prop")
+ .ofType(classOf[String])
+ val consumerConfigOpt = parser.accepts("consumer.config", s"Consumer config properties file. Note that ${consumerPropertyOpt} takes precedence over this config.")
.withRequiredArg
.describedAs("config file")
.ofType(classOf[String])
@@ -291,6 +297,7 @@ object ConsoleConsumer extends Logging {
topicArg = options.valueOf(topicOrFilterOpt.head)
filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
}
+ val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
val consumerProps = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
else