You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/22 08:22:18 UTC
kafka git commit: MINOR: KAFKA-3176 follow-up to fix minor issues
Repository: kafka
Updated Branches:
refs/heads/trunk 8b7dce87a -> 10bbffd75
MINOR: KAFKA-3176 follow-up to fix minor issues
Co-authored with ijuma.
Author: Vahid Hashemian <va...@us.ibm.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #1536 from vahidhashemian/minor/KAFKA-3176-Followup
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10bbffd7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10bbffd7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10bbffd7
Branch: refs/heads/trunk
Commit: 10bbffd75439e10fe9db6cf0aa48a7da7e386ef3
Parents: 8b7dce8
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Wed Jun 22 10:03:29 2016 +0200
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Jun 22 10:03:29 2016 +0200
----------------------------------------------------------------------
.../scala/kafka/consumer/BaseConsumer.scala | 42 ++++++++------
.../scala/kafka/tools/ConsoleConsumer.scala | 58 ++++++++++----------
2 files changed, 56 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/10bbffd7/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index b39da19..6e232a8 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -67,22 +67,32 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
var recordIter = consumer.poll(0).iterator
def consumerInit() {
- if (topic.isDefined)
- if (partitionId.isDefined) {
- val topicPartition = new TopicPartition(topic.get, partitionId.get)
- consumer.assign(List(topicPartition))
- offset.get match {
- case OffsetRequest.EarliestTime => consumer.seekToBeginning(List(topicPartition))
- case OffsetRequest.LatestTime => consumer.seekToEnd(List(topicPartition))
- case _ => consumer.seek(topicPartition, offset.get)
- }
- }
- else
- consumer.subscribe(List(topic.get))
- else if (whitelist.isDefined)
- consumer.subscribe(Pattern.compile(whitelist.get), new NoOpConsumerRebalanceListener())
- else
- throw new IllegalArgumentException("Exactly one of topic or whitelist has to be provided.")
+ (topic, partitionId, offset, whitelist) match {
+ case (Some(topic), Some(partitionId), Some(offset), None) =>
+ seek(topic, partitionId, offset)
+ case (Some(topic), Some(partitionId), None, None) =>
+ // default to latest if no offset is provided
+ seek(topic, partitionId, OffsetRequest.LatestTime)
+ case (Some(topic), None, None, None) =>
+ consumer.subscribe(List(topic))
+ case (None, None, None, Some(whitelist)) =>
+ consumer.subscribe(Pattern.compile(whitelist), new NoOpConsumerRebalanceListener())
+ case _ =>
+ throw new IllegalArgumentException("An invalid combination of arguments is provided. " +
+ "Exactly one of 'topic' or 'whitelist' must be provided. " +
+ "If 'topic' is provided, an optional 'partition' may also be provided. " +
+ "If 'partition' is provided, an optional 'offset' may also be provided, otherwise, consumption starts from the end of the partition.")
+ }
+ }
+
+ def seek(topic: String, partitionId: Int, offset: Long) {
+ val topicPartition = new TopicPartition(topic, partitionId)
+ consumer.assign(List(topicPartition))
+ offset match {
+ case OffsetRequest.EarliestTime => consumer.seekToBeginning(List(topicPartition))
+ case OffsetRequest.LatestTime => consumer.seekToEnd(List(topicPartition))
+ case _ => consumer.seek(topicPartition, offset)
+ }
}
override def receive(): BaseConsumerRecord = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/10bbffd7/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 0b6502a..17cf5bd 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -19,8 +19,7 @@ package kafka.tools
import java.io.PrintStream
import java.util.concurrent.CountDownLatch
-import java.util.{Properties, Random}
-
+import java.util.{Locale, Properties, Random}
import joptsimple._
import kafka.api.OffsetRequest
import kafka.common.{MessageFormatter, StreamEndException}
@@ -34,7 +33,6 @@ import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.serialization.Deserializer
import org.apache.kafka.common.utils.Utils
import org.apache.log4j.Logger
-
import scala.collection.JavaConverters._
/**
@@ -225,7 +223,7 @@ object ConsoleConsumer extends Logging {
.withRequiredArg
.describedAs("consume offset")
.ofType(classOf[String])
- .defaultsTo("earliest")
+ .defaultsTo("latest")
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over.")
.withRequiredArg
@@ -296,8 +294,6 @@ object ConsoleConsumer extends Logging {
// If using new consumer, topic must be specified.
var topicArg: String = null
var whitelistArg: String = null
- var partitionArg: Option[Int] = None
- var offsetArg: Long = OffsetRequest.LatestTime
var filterSpec: TopicFilter = null
val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
val consumerProps = if (options.has(consumerConfigOpt))
@@ -306,7 +302,7 @@ object ConsoleConsumer extends Logging {
new Properties()
val zkConnectionStr = options.valueOf(zkConnectOpt)
val fromBeginning = options.has(resetBeginningOpt)
- partitionArg = if (options.has(partitionIdOpt)) Some(options.valueOf(partitionIdOpt).intValue) else None
+ val partitionArg = if (options.has(partitionIdOpt)) Some(options.valueOf(partitionIdOpt).intValue) else None
val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
@@ -331,33 +327,39 @@ object ConsoleConsumer extends Logging {
topicArg = options.valueOf(topicOrFilterOpt.head)
filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
}
-
+
+ if (!useNewConsumer && (partitionArg.isDefined || options.has(offsetOpt)))
+ CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is supported in the new consumer only.")
+
if (partitionArg.isDefined) {
- if (!useNewConsumer)
- CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is supported in the new consumer only.")
if (!options.has(topicIdOpt))
CommandLineUtils.printUsageAndDie(parser, "The topic is required when partition is specified.")
if (fromBeginning && options.has(offsetOpt))
CommandLineUtils.printUsageAndDie(parser, "Options from-beginning and offset cannot be specified together.")
- if (options.has(offsetOpt) &&
- !(options.valueOf(offsetOpt).toLowerCase().equals("earliest") ||
- options.valueOf(offsetOpt).toLowerCase().equals("latest") ||
- (options.valueOf(offsetOpt) forall Character.isDigit)))
- CommandLineUtils.printUsageAndDie(parser, "The provided offset value is incorrect. Valid values are 'earliest', 'latest', or non-negative numbers.")
- } else if (options.has(offsetOpt)) {
- if (!useNewConsumer)
- CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is supported in the new consumer only.")
- else
- CommandLineUtils.printUsageAndDie(parser, "The partition is required when offset is specified.")
- }
-
- offsetArg = if (options.has(offsetOpt)) {
- options.valueOf(offsetOpt).toLowerCase() match {
- case "earliest" => OffsetRequest.EarliestTime
- case "latest" => OffsetRequest.LatestTime
- case _ => options.valueOf(offsetOpt).toLong
+ } else if (options.has(offsetOpt))
+ CommandLineUtils.printUsageAndDie(parser, "The partition is required when offset is specified.")
+
+ def invalidOffset(offset: String): Nothing =
+ CommandLineUtils.printUsageAndDie(parser, s"The provided offset value '$offset' is incorrect. Valid values are " +
+ "'earliest', 'latest', or a non-negative long.")
+
+ val offsetArg =
+ if (options.has(offsetOpt)) {
+ options.valueOf(offsetOpt).toLowerCase(Locale.ROOT) match {
+ case "earliest" => OffsetRequest.EarliestTime
+ case "latest" => OffsetRequest.LatestTime
+ case offsetString =>
+ val offset =
+ try offsetString.toLong
+ catch {
+ case e: NumberFormatException => invalidOffset(offsetString)
+ }
+ if (offset < 0) invalidOffset(offsetString)
+ offset
+ }
}
- } else if (fromBeginning) OffsetRequest.EarliestTime else OffsetRequest.LatestTime
+ else if (fromBeginning) OffsetRequest.EarliestTime
+ else OffsetRequest.LatestTime
CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt else zkConnectOpt)