You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/05/27 23:56:05 UTC
git commit: KAFKA-924. Specify console consumer properties via a
single --property command line parameter; reviewed by Neha Narkhede
Repository: kafka
Updated Branches:
refs/heads/trunk 0ccc1dc08 -> f8ea4ac01
KAFKA-924. Specify console consumer properties via a single --property command line parameter; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f8ea4ac0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f8ea4ac0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f8ea4ac0
Branch: refs/heads/trunk
Commit: f8ea4ac01d63831d7f5e170d7669e92fca73b673
Parents: 0ccc1dc
Author: Sriharsha Chintalapani <sc...@hortonworks.com>
Authored: Tue May 27 14:55:53 2014 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Tue May 27 14:56:00 2014 -0700
----------------------------------------------------------------------
.../scala/kafka/consumer/ConsoleConsumer.scala | 111 +++++--------------
1 file changed, 28 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f8ea4ac0/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 24c9287..1a16c69 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -54,47 +54,11 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
- val groupIdOpt = parser.accepts("group", "The group id to consume on.")
+
+ val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
.withRequiredArg
- .describedAs("gid")
- .defaultsTo("console-consumer-" + new Random().nextInt(100000))
+ .describedAs("config file")
.ofType(classOf[String])
- val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
- .withRequiredArg
- .describedAs("size")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1024 * 1024)
- val minFetchBytesOpt = parser.accepts("min-fetch-bytes", "The min number of bytes each fetch request waits for.")
- .withRequiredArg
- .describedAs("bytes")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(1)
- val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
- .withRequiredArg
- .describedAs("ms")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(100)
- val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
- .withRequiredArg
- .describedAs("size")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(2 * 1024 * 1024)
- val socketTimeoutMsOpt = parser.accepts("socket-timeout-ms", "The socket timeout used for the connection to the broker")
- .withRequiredArg
- .describedAs("ms")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(ConsumerConfig.SocketTimeout)
- val refreshMetadataBackoffMsOpt = parser.accepts("refresh-leader-backoff-ms", "Backoff time before refreshing metadata")
- .withRequiredArg
- .describedAs("ms")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(ConsumerConfig.RefreshMetadataBackoffMs)
- val consumerTimeoutMsOpt = parser.accepts("consumer-timeout-ms", "consumer throws timeout exception after waiting this much " +
- "of time without incoming messages")
- .withRequiredArg
- .describedAs("prop")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(-1)
val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
.withRequiredArg
.describedAs("class")
@@ -107,11 +71,6 @@ object ConsoleConsumer extends Logging {
val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up");
val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " +
"start with the earliest message present in the log rather than the latest message.")
- val autoCommitIntervalOpt = parser.accepts("autocommit.interval.ms", "The time interval at which to save the current offset in ms")
- .withRequiredArg
- .describedAs("ms")
- .ofType(classOf[java.lang.Integer])
- .defaultsTo(ConsumerConfig.AutoCommitInterval)
val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.")
.withRequiredArg
.describedAs("num_messages")
@@ -124,14 +83,8 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("metrics dictory")
.ofType(classOf[java.lang.String])
- val includeInternalTopicsOpt = parser.accepts("include-internal-topics", "Allow consuming internal topics.")
- val offsetsStorageOpt = parser.accepts("offsets-storage", "Specify offsets storage backend (kafka/zookeeper).")
- .withRequiredArg
- .describedAs("Offsets storage method.")
- .ofType(classOf[String])
- .defaultsTo("zookeeper")
- val dualCommitEnabledOpt = parser.accepts("dual-commit-enabled", "If offsets storage is kafka and this is set, then commit to zookeeper as well.")
+ var groupIdPassed = true
val options: OptionSet = tryParse(parser, args)
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has)
@@ -160,55 +113,47 @@ object ConsoleConsumer extends Logging {
KafkaMetricsReporter.startReporters(verifiableProps)
}
+
+
+ val consumerProps = if (options.has(consumerConfigOpt))
+ Utils.loadProps(options.valueOf(consumerConfigOpt))
+ else
+ new Properties()
+
+ if(!consumerProps.containsKey("group.id")) {
+ consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000))
+ groupIdPassed=false
+ }
+ consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
+ consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt))
+ if(!consumerProps.containsKey("dual.commit.enabled"))
+ consumerProps.put("dual.commit.enabled","false")
+ if(!consumerProps.containsKey("offsets.storage"))
+ consumerProps.put("offsets.storage","zookeeper")
+
if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) &&
- checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + options.valueOf(groupIdOpt)+ "/offsets")) {
- System.err.println("Found previous offset information for this group "+options.valueOf(groupIdOpt)
+ checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) {
+ System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id")
+". Please use --delete-consumer-offsets to delete previous offsets metadata")
System.exit(1)
}
if(options.has(deleteConsumerOffsetsOpt))
- ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt))
-
- val offsetsStorage = options.valueOf(offsetsStorageOpt)
- val props = new Properties()
- props.put("group.id", options.valueOf(groupIdOpt))
- props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
- props.put("socket.timeout.ms", options.valueOf(socketTimeoutMsOpt).toString)
- props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
- props.put("fetch.min.bytes", options.valueOf(minFetchBytesOpt).toString)
- props.put("fetch.wait.max.ms", options.valueOf(maxWaitMsOpt).toString)
- props.put("auto.commit.enable", "true")
- props.put("auto.commit.interval.ms", options.valueOf(autoCommitIntervalOpt).toString)
- props.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest")
- props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
- props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString)
- props.put("refresh.leader.backoff.ms", options.valueOf(refreshMetadataBackoffMsOpt).toString)
- props.put("offsets.storage", offsetsStorage)
- if (options.has(includeInternalTopicsOpt))
- props.put("exclude.internal.topics", "false")
- if (options.has(dualCommitEnabledOpt))
- props.put("dual.commit.enabled", "true")
- else
- props.put("dual.commit.enabled", "false")
+ ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id"))
- val config = new ConsumerConfig(props)
+ val config = new ConsumerConfig(consumerProps)
val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
-
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
val formatterArgs = MessageFormatter.tryParseFormatterArgs(options.valuesOf(messageFormatterArgOpt))
-
val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1
-
val connector = Consumer.create(config)
-
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
connector.shutdown()
// if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
- if(!options.has(groupIdOpt))
- ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + options.valueOf(groupIdOpt))
+ if(!groupIdPassed)
+ ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id"))
}
})