You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/03/31 02:38:55 UTC
[kafka] branch 3.2 updated: MINOR: Fix an uncompatible bug in GetOffsetShell (#11936)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 72809cc MINOR: Fix an uncompatible bug in GetOffsetShell (#11936)
72809cc is described below
commit 72809cce9656fa1238c4247c9edb0a1c4c74ff82
Author: dengziming <de...@gmail.com>
AuthorDate: Thu Mar 31 10:34:39 2022 +0800
MINOR: Fix an uncompatible bug in GetOffsetShell (#11936)
In KIP-815 we replaced KafkaConsumer with AdminClient in GetOffsetShell. In the previous implementation, partitions were just ignored if there is no offset for them, however, we will print -1 instead now, This PR fix this inconsistency.
Reviewers: David Jacot <dj...@confluent.io>, Luke Chen <sh...@gmail.com>
---
core/src/main/scala/kafka/tools/GetOffsetShell.scala | 8 ++++++--
core/src/test/scala/kafka/tools/GetOffsetShellTest.scala | 7 +++++++
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/GetOffsetShell.scala b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
index d4e81bc..03f9c81 100644
--- a/core/src/main/scala/kafka/tools/GetOffsetShell.scala
+++ b/core/src/main/scala/kafka/tools/GetOffsetShell.scala
@@ -22,7 +22,7 @@ import joptsimple._
import kafka.utils.{CommandLineUtils, Exit, IncludeList, ToolsUtils}
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, ListTopicsOptions, OffsetSpec}
import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.requests.ListOffsetsRequest
+import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
import org.apache.kafka.common.utils.Utils
import java.util.Properties
@@ -135,7 +135,11 @@ object GetOffsetShell {
val partitionOffsets = partitionInfos.flatMap { tp =>
try {
val partitionInfo = listOffsetsResult.partitionResult(tp).get
- Some((tp, partitionInfo.offset))
+ if (partitionInfo.offset != ListOffsetsResponse.UNKNOWN_OFFSET) {
+ Some((tp, partitionInfo.offset))
+ } else {
+ None
+ }
} catch {
case e: ExecutionException =>
e.getCause match {
diff --git a/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala b/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
index 02164266..cbce573 100644
--- a/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
+++ b/core/src/test/scala/kafka/tools/GetOffsetShellTest.scala
@@ -167,6 +167,13 @@ class GetOffsetShellTest extends KafkaServerTestHarness with Logging {
}
@Test
+ def testNoOffsetIfTimestampGreaterThanLatestRecord(): Unit = {
+ val time = (System.currentTimeMillis() * 2).toString
+ val offsets = executeAndParse(Array("--topic-partitions", "topic.*", "--time", time))
+ assertEquals(List.empty, offsets)
+ }
+
+ @Test
def testTopicPartitionsArgWithInternalExcluded(): Unit = {
val offsets = executeAndParse(Array("--topic-partitions",
"topic1:0,topic2:1,topic(3|4):2,__.*:3", "--exclude-internal-topics"))