You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/10/16 03:15:00 UTC

[jira] [Commented] (KAFKA-6764) ConsoleConsumer behavior inconsistent when specifying --partition with --from-beginning

    [ https://issues.apache.org/jira/browse/KAFKA-6764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651075#comment-16651075 ] 

ASF GitHub Bot commented on KAFKA-6764:
---------------------------------------

cmccabe closed pull request #5637: MINOR : Fixed KAFKA-6764; Update usage for console-consumer whitelist option
URL: https://github.com/apache/kafka/pull/5637
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index 365652a75b5..06705d59219 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -65,6 +65,7 @@ object ConsoleConsumer extends Logging {
   def run(conf: ConsumerConfig) {
     val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue
     val consumer = new KafkaConsumer(consumerProps(conf), new ByteArrayDeserializer, new ByteArrayDeserializer)
+
     val consumerWrapper =
       if (conf.partitionArg.isDefined)
         new ConsumerWrapper(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg), None, consumer, timeoutMs)
@@ -194,7 +195,7 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("topic")
       .ofType(classOf[String])
-    val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.")
+    val whitelistOpt = parser.accepts("whitelist", "Regular expression specifying whitelist of topics to include for consumption.")
       .withRequiredArg
       .describedAs("whitelist")
       .ofType(classOf[String])
@@ -355,7 +356,7 @@ object ConsoleConsumer extends Logging {
     val groupIdsProvided = Set(
       Option(options.valueOf(groupIdOpt)), // via --group
       Option(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)), // via --consumer-property
-      Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --cosumer.config
+      Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --consumer.config
     ).flatten
 
     if (groupIdsProvided.size > 1) {
@@ -376,6 +377,9 @@ object ConsoleConsumer extends Logging {
         groupIdPassed = false
     }
 
+    if (groupIdPassed && partitionArg.isDefined)
+      CommandLineUtils.printUsageAndDie(parser, "Options group and partition cannot be specified together.")
+
     def tryParse(parser: OptionParser, args: Array[String]): OptionSet = {
       try
         parser.parse(args: _*)
diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
index 47b7fae3d9b..cdc146f3666 100644
--- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala
@@ -432,4 +432,67 @@ class ConsoleConsumerTest {
     assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey)
   }
 
+  @Test
+  def shouldParseGroupIdFromBeginningGivenTogether() {
+    // Start from earliest
+    var args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--group", "test-group",
+      "--from-beginning")
+
+    var config = new ConsoleConsumer.ConsumerConfig(args)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(-2, config.offsetArg)
+    assertEquals(true, config.fromBeginning)
+
+    // Start from latest
+    args = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--group", "test-group"
+    )
+
+    config = new ConsoleConsumer.ConsumerConfig(args)
+    assertEquals("localhost:9092", config.bootstrapServer)
+    assertEquals("test", config.topicArg)
+    assertEquals(-1, config.offsetArg)
+    assertEquals(false, config.fromBeginning)
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldExitOnGroupIdAndPartitionGivenTogether() {
+    Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--group", "test-group",
+      "--partition", "0")
+
+    //When
+    try {
+      new ConsoleConsumer.ConsumerConfig(args)
+    } finally {
+      Exit.resetExitProcedure()
+    }
+  }
+
+  @Test(expected = classOf[IllegalArgumentException])
+  def shouldExitOnOffsetWithoutPartition() {
+    Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull))
+    //Given
+    val args: Array[String] = Array(
+      "--bootstrap-server", "localhost:9092",
+      "--topic", "test",
+      "--offset", "10")
+
+    //When
+    try {
+      new ConsoleConsumer.ConsumerConfig(args)
+    } finally {
+      Exit.resetExitProcedure()
+    }
+  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> ConsoleConsumer behavior inconsistent when specifying --partition with --from-beginning 
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6764
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6764
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: Larry McQueary
>            Assignee: Larry McQueary
>            Priority: Minor
>              Labels: newbie
>
> Per its usage statement, {{kafka-console-consumer.sh}} ignores {{\-\-from-beginning}} when the specified consumer group has committed offsets, and sets {{auto.offset.reset}} to {{latest}}. However, if {{\-\-partition}} is also specified, {{\-\-from-beginning}} is observed in all cases, whether there are committed offsets or not.
> This happens because when {{\-\-from-beginning}} is specified, {{offsetArg}} is set to {{OffsetRequest.EarliestTime}}. However, {{offsetArg}} is [only passed to the constructor|https://github.com/apache/kafka/blob/fedac0cea74feeeece529ee1c0cefd6af53ecbdd/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L76-L79] for {{NewShinyConsumer}} when {{\-\-partition}} is also specified. Hence, it is honored in this case and not the other.
> This case should either be handled consistently, or the usage statement should be modified to indicate the actual behavior/usage. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)