You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/06/22 08:22:18 UTC

kafka git commit: MINOR: KAFKA-3176 follow-up to fix minor issues

Repository: kafka
Updated Branches:
  refs/heads/trunk 8b7dce87a -> 10bbffd75


MINOR: KAFKA-3176 follow-up to fix minor issues

Co-authored with ijuma.

Author: Vahid Hashemian <va...@us.ibm.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #1536 from vahidhashemian/minor/KAFKA-3176-Followup


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/10bbffd7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/10bbffd7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/10bbffd7

Branch: refs/heads/trunk
Commit: 10bbffd75439e10fe9db6cf0aa48a7da7e386ef3
Parents: 8b7dce8
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Wed Jun 22 10:03:29 2016 +0200
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Wed Jun 22 10:03:29 2016 +0200

----------------------------------------------------------------------
 .../scala/kafka/consumer/BaseConsumer.scala     | 42 ++++++++------
 .../scala/kafka/tools/ConsoleConsumer.scala     | 58 ++++++++++----------
 2 files changed, 56 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/10bbffd7/core/src/main/scala/kafka/consumer/BaseConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
index b39da19..6e232a8 100644
--- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala
@@ -67,22 +67,32 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset:
   var recordIter = consumer.poll(0).iterator
 
   def consumerInit() {
-    if (topic.isDefined)
-      if (partitionId.isDefined) {
-        val topicPartition = new TopicPartition(topic.get, partitionId.get)
-        consumer.assign(List(topicPartition))
-        offset.get match {
-          case OffsetRequest.EarliestTime => consumer.seekToBeginning(List(topicPartition))
-          case OffsetRequest.LatestTime => consumer.seekToEnd(List(topicPartition))
-          case _ => consumer.seek(topicPartition, offset.get)
-        }
-      }
-      else
-        consumer.subscribe(List(topic.get))
-    else if (whitelist.isDefined)
-      consumer.subscribe(Pattern.compile(whitelist.get), new NoOpConsumerRebalanceListener())
-    else
-      throw new IllegalArgumentException("Exactly one of topic or whitelist has to be provided.")
+    (topic, partitionId, offset, whitelist) match {
+      case (Some(topic), Some(partitionId), Some(offset), None) =>
+        seek(topic, partitionId, offset)
+      case (Some(topic), Some(partitionId), None, None) =>
+        // default to latest if no offset is provided
+        seek(topic, partitionId, OffsetRequest.LatestTime)
+      case (Some(topic), None, None, None) =>
+        consumer.subscribe(List(topic))
+      case (None, None, None, Some(whitelist)) =>
+        consumer.subscribe(Pattern.compile(whitelist), new NoOpConsumerRebalanceListener())
+      case _ =>
+        throw new IllegalArgumentException("An invalid combination of arguments is provided. " +
+            "Exactly one of 'topic' or 'whitelist' must be provided. " +
+            "If 'topic' is provided, an optional 'partition' may also be provided. " +
+            "If 'partition' is provided, an optional 'offset' may also be provided, otherwise, consumption starts from the end of the partition.")
+    }
+  }
+
+  def seek(topic: String, partitionId: Int, offset: Long) {
+    val topicPartition = new TopicPartition(topic, partitionId)
+    consumer.assign(List(topicPartition))
+    offset match {
+      case OffsetRequest.EarliestTime => consumer.seekToBeginning(List(topicPartition))
+      case OffsetRequest.LatestTime => consumer.seekToEnd(List(topicPartition))
+      case _ => consumer.seek(topicPartition, offset)
+    }
   }
 
   override def receive(): BaseConsumerRecord = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/10bbffd7/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 0b6502a..17cf5bd 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -19,8 +19,7 @@ package kafka.tools
 
 import java.io.PrintStream
 import java.util.concurrent.CountDownLatch
-import java.util.{Properties, Random}
-
+import java.util.{Locale, Properties, Random}
 import joptsimple._
 import kafka.api.OffsetRequest
 import kafka.common.{MessageFormatter, StreamEndException}
@@ -34,7 +33,6 @@ import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.serialization.Deserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.log4j.Logger
-
 import scala.collection.JavaConverters._
 
 /**
@@ -225,7 +223,7 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("consume offset")
       .ofType(classOf[String])
-      .defaultsTo("earliest")
+      .defaultsTo("latest")
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
       "Multiple URLS can be given to allow fail-over.")
       .withRequiredArg
@@ -296,8 +294,6 @@ object ConsoleConsumer extends Logging {
     // If using new consumer, topic must be specified.
     var topicArg: String = null
     var whitelistArg: String = null
-    var partitionArg: Option[Int] = None
-    var offsetArg: Long = OffsetRequest.LatestTime
     var filterSpec: TopicFilter = null
     val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala)
     val consumerProps = if (options.has(consumerConfigOpt))
@@ -306,7 +302,7 @@ object ConsoleConsumer extends Logging {
       new Properties()
     val zkConnectionStr = options.valueOf(zkConnectOpt)
     val fromBeginning = options.has(resetBeginningOpt)
-    partitionArg = if (options.has(partitionIdOpt)) Some(options.valueOf(partitionIdOpt).intValue) else None
+    val partitionArg = if (options.has(partitionIdOpt)) Some(options.valueOf(partitionIdOpt).intValue) else None
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
     val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
     val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala)
@@ -331,33 +327,39 @@ object ConsoleConsumer extends Logging {
       topicArg = options.valueOf(topicOrFilterOpt.head)
       filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg)
     }
-    
+
+    if (!useNewConsumer && (partitionArg.isDefined || options.has(offsetOpt)))
+      CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is supported in the new consumer only.")
+
     if (partitionArg.isDefined) {
-      if (!useNewConsumer)
-        CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is supported in the new consumer only.")
       if (!options.has(topicIdOpt))
         CommandLineUtils.printUsageAndDie(parser, "The topic is required when partition is specified.")
       if (fromBeginning && options.has(offsetOpt))
         CommandLineUtils.printUsageAndDie(parser, "Options from-beginning and offset cannot be specified together.")
-      if (options.has(offsetOpt) &&
-          !(options.valueOf(offsetOpt).toLowerCase().equals("earliest") ||
-            options.valueOf(offsetOpt).toLowerCase().equals("latest") ||
-            (options.valueOf(offsetOpt) forall Character.isDigit)))
-        CommandLineUtils.printUsageAndDie(parser, "The provided offset value is incorrect. Valid values are 'earliest', 'latest', or non-negative numbers.")
-    } else if (options.has(offsetOpt)) {
-      if (!useNewConsumer)
-        CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is supported in the new consumer only.")
-      else
-        CommandLineUtils.printUsageAndDie(parser, "The partition is required when offset is specified.")
-    }
-
-    offsetArg = if (options.has(offsetOpt)) {
-      options.valueOf(offsetOpt).toLowerCase() match {
-        case "earliest" => OffsetRequest.EarliestTime
-        case "latest" => OffsetRequest.LatestTime
-        case _ => options.valueOf(offsetOpt).toLong
+    } else if (options.has(offsetOpt))
+      CommandLineUtils.printUsageAndDie(parser, "The partition is required when offset is specified.")
+
+    def invalidOffset(offset: String): Nothing =
+      CommandLineUtils.printUsageAndDie(parser, s"The provided offset value '$offset' is incorrect. Valid values are " +
+        "'earliest', 'latest', or a non-negative long.")
+
+    val offsetArg =
+      if (options.has(offsetOpt)) {
+        options.valueOf(offsetOpt).toLowerCase(Locale.ROOT) match {
+          case "earliest" => OffsetRequest.EarliestTime
+          case "latest" => OffsetRequest.LatestTime
+          case offsetString =>
+            val offset =
+              try offsetString.toLong
+              catch {
+                case e: NumberFormatException => invalidOffset(offsetString)
+              }
+            if (offset < 0) invalidOffset(offsetString)
+            offset
+        }
       }
-    } else if (fromBeginning) OffsetRequest.EarliestTime else OffsetRequest.LatestTime
+      else if (fromBeginning) OffsetRequest.EarliestTime
+      else OffsetRequest.LatestTime
 
     CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt else zkConnectOpt)