You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/05/27 23:56:05 UTC

git commit: KAFKA-924. Specify console consumer properties via a single --property command line parameter; reviewed by Neha Narkhede

Repository: kafka
Updated Branches:
  refs/heads/trunk 0ccc1dc08 -> f8ea4ac01


KAFKA-924. Specify console consumer properties via a single --property command line parameter; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f8ea4ac0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f8ea4ac0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f8ea4ac0

Branch: refs/heads/trunk
Commit: f8ea4ac01d63831d7f5e170d7669e92fca73b673
Parents: 0ccc1dc
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Tue May 27 14:55:53 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue May 27 14:56:00 2014 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/ConsoleConsumer.scala  | 111 +++++--------------
 1 file changed, 28 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f8ea4ac0/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 24c9287..1a16c69 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -54,47 +54,11 @@ object ConsoleConsumer extends Logging {
             .withRequiredArg
             .describedAs("urls")
             .ofType(classOf[String])
-    val groupIdOpt = parser.accepts("group", "The group id to consume on.")
+
+    val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
             .withRequiredArg
-            .describedAs("gid")
-            .defaultsTo("console-consumer-" + new Random().nextInt(100000))
+            .describedAs("config file")
             .ofType(classOf[String])
-    val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
-            .withRequiredArg
-            .describedAs("size")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(1024 * 1024)
-    val minFetchBytesOpt = parser.accepts("min-fetch-bytes", "The min number of bytes each fetch request waits for.")
-            .withRequiredArg
-            .describedAs("bytes")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(1)
-    val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
-            .withRequiredArg
-            .describedAs("ms")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(100)
-    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
-            .withRequiredArg
-            .describedAs("size")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(2 * 1024 * 1024)
-    val socketTimeoutMsOpt = parser.accepts("socket-timeout-ms", "The socket timeout used for the connection to the broker")
-            .withRequiredArg
-            .describedAs("ms")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(ConsumerConfig.SocketTimeout)
-    val refreshMetadataBackoffMsOpt = parser.accepts("refresh-leader-backoff-ms", "Backoff time before refreshing metadata")
-            .withRequiredArg
-            .describedAs("ms")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(ConsumerConfig.RefreshMetadataBackoffMs)
-    val consumerTimeoutMsOpt = parser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much " +
-            "of time without incoming messages")
-            .withRequiredArg
-            .describedAs("prop")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(-1)
     val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
             .withRequiredArg
             .describedAs("class")
@@ -107,11 +71,6 @@ object ConsoleConsumer extends Logging {
     val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up");
     val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
             "start with the earliest message present in the log rather than the latest message.")
-    val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms")
-            .withRequiredArg
-            .describedAs("ms")
-            .ofType(classOf[java.lang.Integer])
-            .defaultsTo(ConsumerConfig.AutoCommitInterval)
     val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
             .withRequiredArg
             .describedAs("num_messages")
@@ -124,14 +83,8 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("metrics dictory")
       .ofType(classOf[java.lang.String])
-    val includeInternalTopicsOpt = parser.accepts("include-internal-topics", "Allow consuming internal topics.")
-    val offsetsStorageOpt = parser.accepts("offsets-storage", "Specify offsets storage backend (kafka/zookeeper).")
-            .withRequiredArg
-            .describedAs("Offsets storage method.")
-            .ofType(classOf[String])
-            .defaultsTo("zookeeper")
-    val dualCommitEnabledOpt = parser.accepts("dual-commit-enabled", "If offsets storage is kafka and this is set, then commit to zookeeper as well.")
 
+    var groupIdPassed = true
     val options: OptionSet = tryParse(parser, args)
     CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
     val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
@@ -160,55 +113,47 @@ object ConsoleConsumer extends Logging {
       KafkaMetricsReporter.startReporters(verifiableProps)
     }
 
+
+
+    val consumerProps = if (options.has(consumerConfigOpt))
+      Utils.loadProps(options.valueOf(consumerConfigOpt))
+    else
+      new Properties()
+
+    if(!consumerProps.containsKey("group.id")) {
+      consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))
+      groupIdPassed=false
+    }
+    consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
+    consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt))
+    if(!consumerProps.containsKey("dual.commit.enabled"))
+      consumerProps.put("dual.commit.enabled","false")
+    if(!consumerProps.containsKey("offsets.storage"))
+      consumerProps.put("offsets.storage","zookeeper")
+
     if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) &&
-       checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + options.valueOf(groupIdOpt)+ "/offsets")) {
-      System.err.println("Found previous offset information for this group "+options.valueOf(groupIdOpt)
+       checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) {
+      System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id")
         +". Please use --delete-consumer-offsets to delete previous offsets metadata")
       System.exit(1)
     }
 
     if(options.has(deleteConsumerOffsetsOpt))
-      ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt))
-
-    val offsetsStorage = options.valueOf(offsetsStorageOpt)
-    val props = new Properties()
-    props.put("group.id", options.valueOf(groupIdOpt))
-    props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
-    props.put("socket.timeout.ms", options.valueOf(socketTimeoutMsOpt).toString)
-    props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
-    props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString)
-    props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString)
-    props.put("auto.commit.enable", "true")
-    props.put("auto.commit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
-    props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
-    props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
-    props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString)
-    props.put("refresh.leader.backoff.ms", options.valueOf(refreshMetadataBackoffMsOpt).toString)
-    props.put("offsets.storage", offsetsStorage)
-    if (options.has(includeInternalTopicsOpt))
-      props.put("exclude.internal.topics", "false")
-    if (options.has(dualCommitEnabledOpt))
-      props.put("dual.commit.enabled", "true")
-    else
-      props.put("dual.commit.enabled", "false")
+      ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id"))
 
-    val config = new ConsumerConfig(props)
+    val config = new ConsumerConfig(consumerProps)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
-
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
     val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
-
     val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
-
     val connector = Consumer.create(config)
 
-
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
         connector.shutdown()
         // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
-        if(!options.has(groupIdOpt))
-          ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt))
+        if(!groupIdPassed)
+          ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id"))
       }
     })