You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/31 02:38:55 UTC

[kafka] branch 3.2 updated: MINOR: Fix an uncompatible bug in GetOffsetShell (#11936)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 72809cc  MINOR: Fix an uncompatible bug in GetOffsetShell (#11936)
72809cc is described below

commit 72809cce9656fa1238c4247c9edb0a1c4c74ff82
Author: dengziming <de...@gmail.com>
AuthorDate: Thu Mar 31 10:34:39 2022 +0800

    MINOR: Fix an uncompatible bug in GetOffsetShell (#11936)
    
    In KIP-815 we replaced KafkaConsumer with AdminClient in GetOffsetShell. In the previous implementation, partitions were just ignored if there is no offset for them, however, we will print -1 instead now, This PR fix this inconsistency.
    
    Reviewers: David Jacot <dj...@confluent.io>, Luke Chen <sh...@gmail.com>
---
 core/src/main/scala/kafka/tools/GetOffsetShell.scala     | 8 ++++++--
 core/src/test/scala/kafka/tools/GetOffsetShellTest.scala | 7 +++++++
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index d4e81bc..03f9c81 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -22,7 +22,7 @@ import joptsimple._
 import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, ListTopicsOptions, OffsetSpec}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.requests.ListOffsetsRequest
+import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
 import org.apache.kafka.common.utils.Utils
 
 import java.util.Properties
@@ -135,7 +135,11 @@ object GetOffsetShell {
       val partitionOffsets = partitionInfos.flatMap { tp =>
         try {
           val partitionInfo = listOffsetsResult.partitionResult(tp).get
-          Some((tp, partitionInfo.offset))
+          if (partitionInfo.offset != ListOffsetsResponse.UNKNOWN_OFFSET) {
+            Some((tp, partitionInfo.offset))
+          } else {
+            None
+          }
         } catch {
           case e: ExecutionException =>
             e.getCause match {
diff --git a/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala b/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
index 02164266..cbce573 100644
--- a/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
+++ b/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
@@ -167,6 +167,13 @@ class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
   }
 
   @Test
+  def testNoOffsetIfTimestampGreaterThanLatestRecord(): Unit = {
+    val time = (System.currentTimeMillis() * 2).toString
+    val offsets = executeAndParse(Array("--topic-partitions", "topic.*", "--time", time))
+    assertEquals(List.empty, offsets)
+  }
+
+  @Test
   def testTopicPartitionsArgWithInternalExcluded(): Unit = {
     val offsets = executeAndParse(Array("--topic-partitions",
       "topic1:0,topic2:1,topic(3|4):2,__.*:3", "--exclude-internal-topics"))