You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/06/07 06:58:52 UTC

kafka git commit: KAFKA-3748: Add consumer-property to console tools consumer

Repository: kafka
Updated Branches:
  refs/heads/trunk d9f052acc -> 06a57cf19


KAFKA-3748: Add consumer-property to console tools consumer

ijuma harshach edoardocomar Can you please review the changes.

edoardocomar I have addressed your comment of extra space.

Author: Bharat Viswanadham <bh...@us.ibm.com>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1474 from bharatviswa504/Kafka-3748


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/06a57cf1
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/06a57cf1
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/06a57cf1

Branch: refs/heads/trunk
Commit: 06a57cf19c45e82245ada886ea885087ce60ab80
Parents: d9f052a
Author: Bharat Viswanadham <bh...@us.ibm.com>
Authored: Mon Jun 6 23:58:29 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Mon Jun 6 23:58:29 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/06a57cf1/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 3b7a214..c1b5aee 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -167,6 +167,7 @@ object ConsoleConsumer extends Logging {
     val props = new Properties
 
     props.putAll(config.consumerProps)
+    props.putAll(config.extraConsumerProps)
     props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest")
     props.put("zookeeper.connect", config.zkConnectionStr)
 
@@ -189,6 +190,7 @@ object ConsoleConsumer extends Logging {
     val props = new Properties
 
     props.putAll(config.consumerProps)
+    props.putAll(config.extraConsumerProps)
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest")
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer)
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, if (config.keyDeserializer != null) config.keyDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer")
@@ -216,7 +218,11 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("urls")
       .ofType(classOf[String])
-    val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
+    val consumerPropertyOpt = parser.accepts("consumer-property", "A mechanism to pass user-defined properties in the form key=value to the consumer.")
+      .withRequiredArg
+      .describedAs("consumer_prop")
+      .ofType(classOf[String])
+    val consumerConfigOpt = parser.accepts("consumer.config", s"Consumer config properties file. Note that ${consumerPropertyOpt} takes precedence over this config.")
       .withRequiredArg
       .describedAs("config file")
       .ofType(classOf[String])
@@ -291,6 +297,7 @@ object ConsoleConsumer extends Logging {
       topicArg = options.valueOf(topicOrFilterOpt.head)
       filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
     }
+    val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
     val consumerProps = if (options.has(consumerConfigOpt))
       Utils.loadProps(options.valueOf(consumerConfigOpt))
     else