You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/05/25 13:28:59 UTC

[GitHub] [kafka] thomaskwscott opened a new pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

thomaskwscott opened a new pull request #10760:
URL: https://github.com/apache/kafka/pull/10760


   See https://cwiki.apache.org/confluence/display/KAFKA/KIP-734%3A+Improve+AdminClient.listOffsets+to+return+timestamp+and+offset+for+the+record+with+the+largest+timestamp
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   Tested with new Integration test
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r656166220



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,28 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures
+                        partitionsToQuery.stream().forEach(
+                            t -> t.partitions().stream()
+                                .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)
+                                .forEach(
+                                    p -> futures.get(new TopicPartition(t.name(), p.partitionIndex()))
+                                        .completeExceptionally(
+                                            new UnsupportedVersionException(
+                                                "Broker " + brokerId
+                                                    + " does not support MAX_TIMESTAMP offset spec"))
+                                )
+                        );
+                        return true;

Review comment:
       I've updated to include this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r651831839



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4314,18 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+                        // check if there are any non MAX_TIMESTAMPS partitions left to be downgraded
+                        return partitionsToQuery.stream().anyMatch(
+                            t -> t.partitions().stream().anyMatch(
+                                p -> p.timestamp() != ListOffsetsRequest.MAX_TIMESTAMP));
+                    }

Review comment:
       Would it make sense to directly fail the future of the "max timestamp" requests here and to directly `partitionsToQuery` to only contains the remaining types? That would consolidate all the fall back logic here which is simpler to follow. Then, we could add another boolean `requireMaxTimestamp` to `ListOffsetsRequest.Builder.forConsumer` and we could directly pass `supportsMaxTimestamp` to it. This way, `createRequest` and `handleResponse` would remain unchanged. Did you already consider this?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> {
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof UnsupportedVersionException);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543);
+            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node);
+
+            Exception exception = assertThrows(ExecutionException.class, () -> {
+                result.partitionResult(tp0).get();
+            });
+            assertTrue(exception.getCause() instanceof UnsupportedVersionException);
+
+            ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get();
+            assertEquals(345L, tp1Offset.offset());
+            assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+            assertEquals(-1L, tp1Offset.timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampAndNoBrokerResponse() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(new ArrayList<>());
+            env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node);
+
+            Exception maxTimestampException = assertThrows(ExecutionException.class, () -> {
+                result.partitionResult(tp0).get();
+            });
+            assertTrue(maxTimestampException.getCause() instanceof UnsupportedVersionException);
+
+            Exception nopResponseException = assertThrows(ExecutionException.class, () -> {
+                result.partitionResult(tp1).get();
+            });
+            assertTrue(nopResponseException.getCause() instanceof ApiException);
+        }
+    }

Review comment:
       Could we also add a unit test for the happy path with max timestamp?

##########
File path: core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
+
+  val topicName = "foo"
+  var adminClient: Admin = null
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    super.setUp()
+    createTopic(topicName, 1, 1.asInstanceOf[Short])

Review comment:
       nit: You can use `.toShort`.

##########
File path: core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
+
+  val topicName = "foo"
+  var adminClient: Admin = null
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    super.setUp()
+    createTopic(topicName,1,1.asInstanceOf[Short])
+    produceMessages()
+    adminClient = Admin.create(Map[String, Object](
+      AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
+    ).asJava)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
+    super.tearDown()
+  }
+
+  @Test
+  def testEarliestOffset(): Unit = {
+    val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
+    assertEquals(0,earliestOffset.offset())
+  }
+
+  @Test
+  def testLatestOffset(): Unit = {
+    val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
+    assertEquals(3,latestOffset.offset())
+  }
+
+  @Test
+  def testMaxTimestampOffset(): Unit = {
+    val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp())
+    assertEquals(1,maxTimestampOffset.offset())
+  }
+
+  private def runFetchOffsets(adminClient: Admin,
+                              offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = {
+    println(s"==> listOffsets(${topicName} -> ${offsetSpec}, new ListOffsetsOptions())")

Review comment:
       Yeah, let's remove it.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> {
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof UnsupportedVersionException);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});

Review comment:
       nit: Could we indent the block such that `}});` is aligned with `ListOffsetsResult`? Same for other tests.

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -92,6 +92,57 @@ class LogOffsetTest extends BaseRequestTest {
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(topicPartition).get

Review comment:
       This block is common in all the test cases that have been added. Could we extract it into a helper method?

##########
File path: core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
##########
@@ -162,11 +162,15 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
     val firstLeaderId = partitionToLeader(partition.partition)
 
-    TestUtils.generateAndProduceMessages(servers, topic, 10)
+    // produce in 2 batches to ensure the max timestamp matches the last message
+    TestUtils.generateAndProduceMessages(servers, topic, 9)
+    Thread.sleep(10)
+    TestUtils.generateAndProduceMessages(servers, topic, 1)

Review comment:
       It this really necessary? If yes, could we remove the `sleep`?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> {
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof UnsupportedVersionException);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543);
+            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node);

Review comment:
       nit: I would move this block up, before calling `listOffsets`, in order to have the response ready. Same for other tests.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> {
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof UnsupportedVersionException);

Review comment:
       nit: We could use `TestUtils.assertFutureThrows` here.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File,
         val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
         val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
         Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional))
+      } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+        // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
+        // constant time access while being safe to use with concurrent collections unlike `toArray`.
+        val segmentsCopy = logSegments.toBuffer
+        val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
+        val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
+        val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
+        Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar,
+          latestTimestampSegment.offsetOfMaxTimestampSoFar,
+          epochOptional))

Review comment:
       I think that we should address this as well. I am fine with doing it in a follow-up PR though so we can keep this focused. Ok for you?

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -92,6 +92,57 @@ class LogOffsetTest extends BaseRequestTest {
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(topicPartition).get
+
+    for (timestamp <- 0 until 20)
+      log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0)
+    log.flush()
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(19L, firstOffset.get.offset)
+
+    log.truncateTo(0)
+
+    val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(0L, secondOffset.get.offset)
+
+  }
+
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(topicPartition).get
+
+    for (timestamp <- List(0L, 1L, 2L, 3L, 4L, 6L, 5L))
+      log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp), leaderEpoch = 0)
+    log.flush()
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(7L, log.logEndOffset)
+    assertEquals(5L, maxTimestampOffset.get.offset)

Review comment:
       Should we check the timestamp as well here and in the other tests as well?

##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -2072,6 +2072,30 @@ class LogTest {
       log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
   }
 
+  @Test
+  def testFetchOffsetByTimestampWithMaxTimestampIncludesTimestamp(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
+    val log = createLog(logDir, logConfig)
+
+    assertEquals(None, log.fetchOffsetByTimestamp(0L))
+
+    val firstTimestamp = mockTime.milliseconds
+    val leaderEpoch = 0
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = firstTimestamp),
+      leaderEpoch = leaderEpoch)
+
+    val secondTimestamp = firstTimestamp + 1

Review comment:
       Should we produce a third record with a timestamp lower than this one to ensure that the API returns the maximum one and not the latest?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r655535673



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,28 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures
+                        partitionsToQuery.stream().forEach(
+                            t -> t.partitions().stream()
+                                .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)
+                                .forEach(
+                                    p -> futures.get(new TopicPartition(t.name(), p.partitionIndex()))
+                                        .completeExceptionally(
+                                            new UnsupportedVersionException(
+                                                "Broker " + brokerId
+                                                    + " does not support MAX_TIMESTAMP offset spec"))
+                                )
+                        );
+                        return true;

Review comment:
       Could we return `true` only if they were `MAX_TIMESTAMP`? We won't need to retry if they were none.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640543594



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File,
         val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
         val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
         Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional))
+      } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+        // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
+        // constant time access while being safe to use with concurrent collections unlike `toArray`.
+        val segmentsCopy = logSegments.toBuffer
+        val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
+        val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
+        val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
+        Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar,
+          latestTimestampSegment.offsetOfMaxTimestampSoFar,
+          epochOptional))

Review comment:
       In all cases I can find the 2 are updated together so I think we can assume consistency. For the topic liveness case in the KIP absolute consistency is not required but there will be other cases that will need this (e.g. topic inspection).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657051441



##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -149,6 +179,21 @@ class LogOffsetTest extends BaseRequestTest {
     assertFalse(offsetChanged)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+    val log = createTopicAndGetLog(topic, topicPartition)
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(0L, log.logEndOffset)
+    assertEquals(0L, maxTimestampOffset.get.offset)
+    assertEquals(-1L, maxTimestampOffset.get.timestamp)
+

Review comment:
       done

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -266,4 +311,14 @@ class LogOffsetTest extends BaseRequestTest {
       .partitions.asScala.find(_.partitionIndex == tp.partition).get
   }
 
+  private def createTopicAndGetLog(topic: String, topicPartition: TopicPartition): Log = {
+

Review comment:
       done

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+

Review comment:
       done

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+

Review comment:
       done

##########
File path: clients/src/main/resources/common/message/ListOffsetsRequest.json
##########
@@ -30,7 +30,9 @@
   // Version 5 is the same as version 4.
   //
   // Version 6 enables flexible versions.
-  "validVersions": "0-6",
+  //
+  // Version 7 enables listing offsets by max timestamp.

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r642967602



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4225,76 +4232,84 @@ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartit
             }
         }
 
-        for (final Map.Entry<Node, Map<String, ListOffsetsTopic>> entry : leaders.entrySet()) {
-            final int brokerId = entry.getKey().id();
+        for (final Map.Entry<Node, Map<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>>> versionedEntry : leaders.entrySet()) {
+            for (final Map.Entry<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>> entry : versionedEntry.getValue().entrySet()) {
+                final int brokerId = versionedEntry.getKey().id();
 
-            calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+                calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-                final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
+                    final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
 
-                @Override
-                ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-                    return ListOffsetsRequest.Builder
+                    @Override
+                    ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+                        ListOffsetRequestVersion requestVersion = entry.getKey();
+                        if (requestVersion == ListOffsetRequestVersion.V7AndAbove) {
+                            return ListOffsetsRequest.Builder
+                                .forMaxTimestamp(context.options().isolationLevel())
+                                .setTargetTimes(partitionsToQuery);
+                        }

Review comment:
       Ah, I missed that pattern. I'll fix it up to be consistent with the other examples.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r654459694



##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> {
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof UnsupportedVersionException);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});

Review comment:
       checkstyle is not letting me :-(

##########
File path: core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
+
+  val topicName = "foo"
+  var adminClient: Admin = null
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    super.setUp()
+    createTopic(topicName,1,1.asInstanceOf[Short])
+    produceMessages()
+    adminClient = Admin.create(Map[String, Object](
+      AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
+    ).asJava)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
+    super.tearDown()
+  }
+
+  @Test
+  def testEarliestOffset(): Unit = {
+    val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
+    assertEquals(0,earliestOffset.offset())
+  }
+
+  @Test
+  def testLatestOffset(): Unit = {
+    val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
+    assertEquals(3,latestOffset.offset())
+  }
+
+  @Test
+  def testMaxTimestampOffset(): Unit = {
+    val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp())
+    assertEquals(1,maxTimestampOffset.offset())
+  }
+
+  private def runFetchOffsets(adminClient: Admin,
+                              offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = {
+    println(s"==> listOffsets(${topicName} -> ${offsetSpec}, new ListOffsetsOptions())")

Review comment:
       removed

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> {
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof UnsupportedVersionException);

Review comment:
       updated

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> {
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof UnsupportedVersionException);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543);
+            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node);

Review comment:
       done

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4227,137 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            Exception exception = assertThrows(ExecutionException.class, () -> {
+                Map<TopicPartition, ListOffsetsResultInfo> offsets = result.all().get(3, TimeUnit.SECONDS);
+            });
+            assertTrue(exception.getCause() instanceof UnsupportedVersionException);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543);
+            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node);
+
+            Exception exception = assertThrows(ExecutionException.class, () -> {
+                result.partitionResult(tp0).get();
+            });
+            assertTrue(exception.getCause() instanceof UnsupportedVersionException);
+
+            ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get();
+            assertEquals(345L, tp1Offset.offset());
+            assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+            assertEquals(-1L, tp1Offset.timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampAndNoBrokerResponse() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(new ArrayList<>());
+            env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node);
+
+            Exception maxTimestampException = assertThrows(ExecutionException.class, () -> {
+                result.partitionResult(tp0).get();
+            });
+            assertTrue(maxTimestampException.getCause() instanceof UnsupportedVersionException);
+
+            Exception nopResponseException = assertThrows(ExecutionException.class, () -> {
+                result.partitionResult(tp1).get();
+            });
+            assertTrue(nopResponseException.getCause() instanceof ApiException);
+        }
+    }

Review comment:
       I added it to the existing happy path test

##########
File path: core/src/test/scala/unit/kafka/log/LogTest.scala
##########
@@ -2072,6 +2072,30 @@ class LogTest {
       log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
   }
 
+  @Test
+  def testFetchOffsetByTimestampWithMaxTimestampIncludesTimestamp(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1)
+    val log = createLog(logDir, logConfig)
+
+    assertEquals(None, log.fetchOffsetByTimestamp(0L))
+
+    val firstTimestamp = mockTime.milliseconds
+    val leaderEpoch = 0
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = firstTimestamp),
+      leaderEpoch = leaderEpoch)
+
+    val secondTimestamp = firstTimestamp + 1

Review comment:
       done

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -92,6 +92,57 @@ class LogOffsetTest extends BaseRequestTest {
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(topicPartition).get
+
+    for (timestamp <- 0 until 20)
+      log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0)
+    log.flush()
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(19L, firstOffset.get.offset)
+
+    log.truncateTo(0)
+
+    val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(0L, secondOffset.get.offset)
+
+  }
+
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(topicPartition).get
+
+    for (timestamp <- List(0L, 1L, 2L, 3L, 4L, 6L, 5L))
+      log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp), leaderEpoch = 0)
+    log.flush()
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(7L, log.logEndOffset)
+    assertEquals(5L, maxTimestampOffset.get.offset)

Review comment:
       updated

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -92,6 +92,57 @@ class LogOffsetTest extends BaseRequestTest {
     assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 3L), consumerOffsets)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+
+    createTopic(topic, 1, 1)
+
+    val logManager = server.getLogManager
+    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
+      "Log for partition [topic,0] should be created")
+    val log = logManager.getLog(topicPartition).get

Review comment:
       sure, I refactored it out.

##########
File path: core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
##########
@@ -162,11 +162,15 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
     val firstLeaderId = partitionToLeader(partition.partition)
 
-    TestUtils.generateAndProduceMessages(servers, topic, 10)
+    // produce in 2 batches to ensure the max timestamp matches the last message
+    TestUtils.generateAndProduceMessages(servers, topic, 9)
+    Thread.sleep(10)
+    TestUtils.generateAndProduceMessages(servers, topic, 1)

Review comment:
       This test had become flaky because multiple messages could have the same timestamp. This pause ensured the last message had the highest timestamp. I've removed the sleep now and explicitly set the timestamp in the test messages.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r658631929



##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                // ensure that the initial request contains max timestamp requests
+                request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP));

Review comment:
       done

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                // ensure that the initial request contains max timestamp requests
+                request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP));
+
+            ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543);
+            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(
+                // ensure that no max timestamp requests are retried
+                request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP),
+                new ListOffsetsResponse(responseData), node);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class);
+
+            ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get();
+            assertEquals(345L, tp1Offset.offset());
+            assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+            assertEquals(-1L, tp1Offset.timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsUnsupportedNonMaxTimestamp() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                // ensure that the initial request doesn't contain max timestamp requests
+                request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP));
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.latest());
+                }});

Review comment:
       done

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -114,7 +114,9 @@ object ApiVersion {
     // Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516)
     KAFKA_2_8_IV1,
     // Introduce AllocateProducerIds (KIP-730)
-    KAFKA_3_0_IV0
+    KAFKA_3_0_IV0,
+    // Introduce ListOffsets maxTimestamps (KIP-734)

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r656165498



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,28 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures
+                        partitionsToQuery.stream().forEach(
+                            t -> t.partitions().stream()
+                                .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)
+                                .forEach(
+                                    p -> futures.get(new TopicPartition(t.name(), p.partitionIndex()))
+                                        .completeExceptionally(
+                                            new UnsupportedVersionException(
+                                                "Broker " + brokerId
+                                                    + " does not support MAX_TIMESTAMP offset spec"))
+                                )
+                        );

Review comment:
       good call, I've updated to do this.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640640161



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4225,76 +4232,84 @@ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartit
             }
         }
 
-        for (final Map.Entry<Node, Map<String, ListOffsetsTopic>> entry : leaders.entrySet()) {
-            final int brokerId = entry.getKey().id();
+        for (final Map.Entry<Node, Map<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>>> versionedEntry : leaders.entrySet()) {
+            for (final Map.Entry<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>> entry : versionedEntry.getValue().entrySet()) {
+                final int brokerId = versionedEntry.getKey().id();
 
-            calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+                calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-                final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
+                    final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
 
-                @Override
-                ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-                    return ListOffsetsRequest.Builder
+                    @Override
+                    ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+                        ListOffsetRequestVersion requestVersion = entry.getKey();
+                        if (requestVersion == ListOffsetRequestVersion.V7AndAbove) {
+                            return ListOffsetsRequest.Builder
+                                .forMaxTimestamp(context.options().isolationLevel())
+                                .setTargetTimes(partitionsToQuery);
+                        }

Review comment:
       It seems that the current logic send only one request per broker/leader whereas we could send up to two requests with your PR because specs are partitioned by `Node` and `ListOffsetRequestVersion`. Previously, they were only partitioned by `Node`.
   
   Intuitively, I would have approached the problem differently. I would have put all the specs in the same request and constrained its version to 7 and above if there is at least one `MAX_TIMESTAMP`. If the request succeeds, all good. If the request fail with an `UnsupportedVersionException`, I would have retried with all the specs but the `MAX_TIMESTAMP` ones and I would have failed the future of the `MAX_TIMESTAMP` specs.
   
   In case of an `UnsupportedVersionException`, the admin client calls the `handleUnsupportedVersionException` method of the `Call`. This gives you an opportunity to downgrade and to retry the `Call`. There are couple of example in the `KafkaAdminClient`.
   
   I wonder if we could rely on a similar pattern and avoid sending two requests per leader in the worst case. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r658512975



##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                // ensure that the initial request contains max timestamp requests
+                request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP));
+
+            ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543);
+            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(
+                // ensure that no max timestamp requests are retried
+                request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP),
+                new ListOffsetsResponse(responseData), node);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class);
+
+            ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get();
+            assertEquals(345L, tp1Offset.offset());
+            assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+            assertEquals(-1L, tp1Offset.timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsUnsupportedNonMaxTimestamp() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                // ensure that the initial request doesn't contain max timestamp requests
+                request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP));
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.latest());
+                }});

Review comment:
       nit: We could use `singletonMap` here.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                // ensure that the initial request contains max timestamp requests
+                request -> request instanceof ListOffsetsRequest && ((ListOffsetsRequest) request).topics().stream()
+                    .flatMap(t -> t.partitions().stream())
+                    .anyMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP));

Review comment:
       As discussed yesterday, the matcher is not called. Therefore, I think that we should remove the logic here as it is misleading. The condition does not bring much anyway. Please, check the other usages of `prepareUnsupportedVersionResponse`.

##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -114,7 +114,9 @@ object ApiVersion {
     // Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516)
     KAFKA_2_8_IV1,
     // Introduce AllocateProducerIds (KIP-730)
-    KAFKA_3_0_IV0
+    KAFKA_3_0_IV0,
+    // Introduce ListOffsets maxTimestamps (KIP-734)

Review comment:
       Could we say `Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734)`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10760: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-869730993


   @vvcephei filed https://issues.apache.org/jira/browse/KAFKA-13002, please take a look and see if it's related to this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r655505767



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4314,18 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+                        // check if there are any non MAX_TIMESTAMPS partitions left to be downgraded
+                        return partitionsToQuery.stream().anyMatch(
+                            t -> t.partitions().stream().anyMatch(
+                                p -> p.timestamp() != ListOffsetsRequest.MAX_TIMESTAMP));
+                    }

Review comment:
       that makes sense, I've changed it to work that way now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r655505767



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4314,18 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+                        // check if there are any non MAX_TIMESTAMPS partitions left to be downgraded
+                        return partitionsToQuery.stream().anyMatch(
+                            t -> t.partitions().stream().anyMatch(
+                                p -> p.timestamp() != ListOffsetsRequest.MAX_TIMESTAMP));
+                    }

Review comment:
       that makes sense, I've changed it to work that way now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10760: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-869749901






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657007003



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,39 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures and remove partitions from the downgraded retry
+                        List<ListOffsetsTopic> topicsToRemove = new ArrayList<>();
+                        partitionsToQuery.stream().forEach(
+                            t -> {
+                                List<ListOffsetsPartition> partitionsToRemove = new ArrayList<>();
+                                t.partitions().stream()
+                                    .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)
+                                    .forEach(
+                                        p -> {
+                                            futures.get(new TopicPartition(t.name(), p.partitionIndex()))
+                                                .completeExceptionally(
+                                                    new UnsupportedVersionException(
+                                                        "Broker " + brokerId
+                                                            + " does not support MAX_TIMESTAMP offset spec"));
+                                            partitionsToRemove.add(p);
+
+                                        });
+                                t.partitions().removeAll(partitionsToRemove);
+                                if (t.partitions().isEmpty()) topicsToRemove.add(t);
+                            }
+                        );
+                        partitionsToQuery.removeAll(topicsToRemove);
+
+                        return !partitionsToQuery.isEmpty();

Review comment:
       good point, I've added a check for this.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);

Review comment:
       sure thing, updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-868463282


   Failures are not related:
   ```
   Build / JDK 16 and Scala 2.13 / kafka.server.ReplicationQuotasTest.shouldThrottleOldSegments()
   Build / JDK 16 and Scala 2.13 / org.apache.kafka.streams.integration.KTableKTableForeignKeyJoinDistributedTest.shouldBeInitializedWithDefaultSerde
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
   Build / JDK 8 and Scala 2.12 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
   Build / JDK 16 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[3] tlsProtocol=TLSv1.3, useInlinePem=false
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[1] tlsProtocol=TLSv1.2, useInlinePem=false
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[2] tlsProtocol=TLSv1.2, useInlinePem=true
   Build / JDK 11 and Scala 2.13 / org.apache.kafka.common.network.SslTransportLayerTest.[3] tlsProtocol=TLSv1.3, useInlinePem=false
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] zhishengzhang commented on pull request #10760: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)

Posted by GitBox <gi...@apache.org>.
zhishengzhang commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-1077361506


   good job


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac merged pull request #10760: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)

Posted by GitBox <gi...@apache.org>.
dajac merged pull request #10760:
URL: https://github.com/apache/kafka/pull/10760


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r647639491



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4225,76 +4232,84 @@ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartit
             }
         }
 
-        for (final Map.Entry<Node, Map<String, ListOffsetsTopic>> entry : leaders.entrySet()) {
-            final int brokerId = entry.getKey().id();
+        for (final Map.Entry<Node, Map<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>>> versionedEntry : leaders.entrySet()) {
+            for (final Map.Entry<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>> entry : versionedEntry.getValue().entrySet()) {
+                final int brokerId = versionedEntry.getKey().id();
 
-            calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+                calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-                final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
+                    final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
 
-                @Override
-                ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-                    return ListOffsetsRequest.Builder
+                    @Override
+                    ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+                        ListOffsetRequestVersion requestVersion = entry.getKey();
+                        if (requestVersion == ListOffsetRequestVersion.V7AndAbove) {
+                            return ListOffsetsRequest.Builder
+                                .forMaxTimestamp(context.options().isolationLevel())
+                                .setTargetTimes(partitionsToQuery);
+                        }

Review comment:
       @dajac I changed to use the approach discussed here, I'd appreciate it if you can give this another review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657032428



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
##########
@@ -54,9 +55,11 @@ public static Builder forReplica(short allowedVersion, int replicaId) {
             return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
         }
 
-        public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) {
+        public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel, boolean requireMaxTimestamp) {

Review comment:
       added




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-859299406


   @thomaskwscott Thanks for the update. I will review it next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657052453



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
##########
@@ -54,9 +55,11 @@ public static Builder forReplica(short allowedVersion, int replicaId) {
             return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
         }
 
-        public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) {
+        public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel, boolean requireMaxTimestamp) {

Review comment:
       yep, I added a small one to verify oldest versions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r655535240



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,28 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures
+                        partitionsToQuery.stream().forEach(
+                            t -> t.partitions().stream()
+                                .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)
+                                .forEach(
+                                    p -> futures.get(new TopicPartition(t.name(), p.partitionIndex()))
+                                        .completeExceptionally(
+                                            new UnsupportedVersionException(
+                                                "Broker " + brokerId
+                                                    + " does not support MAX_TIMESTAMP offset spec"))
+                                )
+                        );

Review comment:
       Should we removed the `MAX_TIMESTAMP` ones from `partitionsToQuery` as well? Otherwise, they will be retried.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r656527325



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File,
         val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
         val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
         Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional))
+      } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+        // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
+        // constant time access while being safe to use with concurrent collections unlike `toArray`.
+        val segmentsCopy = logSegments.toBuffer
+        val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
+        val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
+        val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
+        Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar,
+          latestTimestampSegment.offsetOfMaxTimestampSoFar,
+          epochOptional))

Review comment:
       sure thing, raised: https://issues.apache.org/jira/browse/KAFKA-12981




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657710021



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,42 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+
+                    // if no max timestamp requests were submitted we should not retry
+                    if (partitionsToQuery.stream()
+                        .flatMap(t -> t.partitions().stream())
+                        .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP))
+                        return false;

Review comment:
       As we already iterate over all the partitions below, would it be reasonable to check this there? We could set a boolean flag if there is at least one `MAX_TIMESTAMP`. Also, it would be great if we could add a unit test that covers this case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640552093



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4225,76 +4232,84 @@ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartit
             }
         }
 
-        for (final Map.Entry<Node, Map<String, ListOffsetsTopic>> entry : leaders.entrySet()) {
-            final int brokerId = entry.getKey().id();
+        for (final Map.Entry<Node, Map<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>>> versionedEntry : leaders.entrySet()) {
+            for (final Map.Entry<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>> entry : versionedEntry.getValue().entrySet()) {
+                final int brokerId = versionedEntry.getKey().id();
 
-            calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+                calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-                final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
+                    final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
 
-                @Override
-                ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-                    return ListOffsetsRequest.Builder
+                    @Override
+                    ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+                        ListOffsetRequestVersion requestVersion = entry.getKey();
+                        if (requestVersion == ListOffsetRequestVersion.V7AndAbove) {
+                            return ListOffsetsRequest.Builder
+                                .forMaxTimestamp(context.options().isolationLevel())
+                                .setTargetTimes(partitionsToQuery);
+                        }

Review comment:
       That's right, as I read it we would send separate requests even under the old logic (i.e. for LATEST_TIMESTAMP and EARLIEST_TIMESTAMP). The only difference here is we limit the versions for MAX_TIMESTAMP.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10760: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-869776967


   Thanks for the quick investigation!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657007472



##########
File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1444,43 +1450,63 @@ private DeleteGroupsResponse createDeleteGroupsResponse() {
         );
     }
 
-    private ListOffsetsRequest createListOffsetRequest(int version) {
+    private ListOffsetsRequest createListOffsetRequest(int version, long timestamp) {
         if (version == 0) {
             ListOffsetsTopic topic = new ListOffsetsTopic()
                     .setName("test")
                     .setPartitions(Arrays.asList(new ListOffsetsPartition()
                             .setPartitionIndex(0)
-                            .setTimestamp(1000000L)
+                            .setTimestamp(timestamp)
                             .setMaxNumOffsets(10)
                             .setCurrentLeaderEpoch(5)));
             return ListOffsetsRequest.Builder
-                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
                     .setTargetTimes(Collections.singletonList(topic))
                     .build((short) version);
         } else if (version == 1) {
             ListOffsetsTopic topic = new ListOffsetsTopic()
                     .setName("test")
                     .setPartitions(Arrays.asList(new ListOffsetsPartition()
                             .setPartitionIndex(0)
-                            .setTimestamp(1000000L)
+                            .setTimestamp(timestamp)
                             .setCurrentLeaderEpoch(5)));
             return ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
+                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
                     .setTargetTimes(Collections.singletonList(topic))
                     .build((short) version);
-        } else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) {
+        } else if (version >= 2 && version <= 6) {
             ListOffsetsPartition partition = new ListOffsetsPartition()
                     .setPartitionIndex(0)
-                    .setTimestamp(1000000L)
+                    .setTimestamp(timestamp)
                     .setCurrentLeaderEpoch(5);
 
             ListOffsetsTopic topic = new ListOffsetsTopic()
                     .setName("test")
                     .setPartitions(Arrays.asList(partition));
             return ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_COMMITTED)
+                    .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
                     .setTargetTimes(Collections.singletonList(topic))
                     .build((short) version);
+        } else if (version >= 7 && version <= LIST_OFFSETS.latestVersion()) {
+            ListOffsetsPartition partition = new ListOffsetsPartition()
+                .setPartitionIndex(0)
+                .setTimestamp(timestamp)
+                .setCurrentLeaderEpoch(5);
+
+            ListOffsetsTopic topic = new ListOffsetsTopic()
+                .setName("test")
+                .setPartitions(Arrays.asList(partition));
+            if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+                return ListOffsetsRequest.Builder
+                        .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
+                        .setTargetTimes(Collections.singletonList(topic))
+                        .build((short) version);
+            } else {

Review comment:
       removed it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657007220



##########
File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -302,10 +302,16 @@ public void testSerialization() throws Exception {
         checkErrorResponse(createDeleteGroupsRequest(), unknownServerException, true);
         checkResponse(createDeleteGroupsResponse(), 0, true);
         for (short version : LIST_OFFSETS.allVersions()) {
-            checkRequest(createListOffsetRequest(version), true);
-            checkErrorResponse(createListOffsetRequest(version), unknownServerException, true);
+            checkRequest(createListOffsetRequest(version, 1000000L), true);
+            checkErrorResponse(createListOffsetRequest(version, 1000000L), unknownServerException, true);
             checkResponse(createListOffsetResponse(version), version, true);
         }
+        LIST_OFFSETS.allVersions().stream().filter(version -> version >= (short) 7).forEach(
+            version -> {
+                checkRequest(createListOffsetRequest(version, ListOffsetsRequest.MAX_TIMESTAMP), true);
+                checkErrorResponse(createListOffsetRequest(version, ListOffsetsRequest.MAX_TIMESTAMP), unknownServerException, true);
+            }
+        );

Review comment:
       coolio, have removed it.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -432,8 +438,8 @@ public void testSerialization() throws Exception {
         checkRequest(createUpdateMetadataRequest(5, null), false);
         checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), unknownServerException, true);
         checkResponse(createUpdateMetadataResponse(), 0, true);
-        checkRequest(createListOffsetRequest(0), true);
-        checkErrorResponse(createListOffsetRequest(0), unknownServerException, true);
+        checkRequest(createListOffsetRequest(0, 1000000L), true);
+        checkErrorResponse(createListOffsetRequest(0, 1000000L), unknownServerException, true);

Review comment:
       yep, removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10760: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-869749901


   Looking at it now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r655535240



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,28 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures
+                        partitionsToQuery.stream().forEach(
+                            t -> t.partitions().stream()
+                                .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)
+                                .forEach(
+                                    p -> futures.get(new TopicPartition(t.name(), p.partitionIndex()))
+                                        .completeExceptionally(
+                                            new UnsupportedVersionException(
+                                                "Broker " + brokerId
+                                                    + " does not support MAX_TIMESTAMP offset spec"))
+                                )
+                        );

Review comment:
       Should we removed the `MAX_TIMESTAMP` ones from `partitionsToQuery` as well? Otherwise, they will be retried.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,28 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures
+                        partitionsToQuery.stream().forEach(
+                            t -> t.partitions().stream()
+                                .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)
+                                .forEach(
+                                    p -> futures.get(new TopicPartition(t.name(), p.partitionIndex()))
+                                        .completeExceptionally(
+                                            new UnsupportedVersionException(
+                                                "Broker " + brokerId
+                                                    + " does not support MAX_TIMESTAMP offset spec"))
+                                )
+                        );
+                        return true;

Review comment:
       Could we return `true` only if they were `MAX_TIMESTAMP`? We won't need to retry if they were none.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640520669



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4225,76 +4232,84 @@ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartit
             }
         }
 
-        for (final Map.Entry<Node, Map<String, ListOffsetsTopic>> entry : leaders.entrySet()) {
-            final int brokerId = entry.getKey().id();
+        for (final Map.Entry<Node, Map<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>>> versionedEntry : leaders.entrySet()) {
+            for (final Map.Entry<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>> entry : versionedEntry.getValue().entrySet()) {
+                final int brokerId = versionedEntry.getKey().id();
 
-            calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+                calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-                final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
+                    final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
 
-                @Override
-                ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-                    return ListOffsetsRequest.Builder
+                    @Override
+                    ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+                        ListOffsetRequestVersion requestVersion = entry.getKey();
+                        if (requestVersion == ListOffsetRequestVersion.V7AndAbove) {
+                            return ListOffsetsRequest.Builder
+                                .forMaxTimestamp(context.options().isolationLevel())
+                                .setTargetTimes(partitionsToQuery);
+                        }

Review comment:
       At present the only way requestVersion could be V7AndAbove is if we were issuing MAX_TIMESTAMP requests because of the way the calls are parsed earlier:
   
   `                    
   ListOffsetRequestVersion requiredRequestVersion = offsetQuery == ListOffsetsRequest.MAX_TIMESTAMP
                           ? ListOffsetRequestVersion.V7AndAbove :
                           ListOffsetRequestVersion.V0AndAbove;
   `
   
   All non-max timestamp requests are built using forConsumer rather than forMaxTimestamp and so should succeed against older brokers. Maybe the enums are a bit misleading in this regard. I'll see if i can come up with something better.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640544984



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4225,76 +4232,84 @@ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartit
             }
         }
 
-        for (final Map.Entry<Node, Map<String, ListOffsetsTopic>> entry : leaders.entrySet()) {
-            final int brokerId = entry.getKey().id();
+        for (final Map.Entry<Node, Map<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>>> versionedEntry : leaders.entrySet()) {
+            for (final Map.Entry<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>> entry : versionedEntry.getValue().entrySet()) {
+                final int brokerId = versionedEntry.getKey().id();
 
-            calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+                calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-                final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
+                    final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
 
-                @Override
-                ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-                    return ListOffsetsRequest.Builder
+                    @Override
+                    ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+                        ListOffsetRequestVersion requestVersion = entry.getKey();
+                        if (requestVersion == ListOffsetRequestVersion.V7AndAbove) {
+                            return ListOffsetsRequest.Builder
+                                .forMaxTimestamp(context.options().isolationLevel())
+                                .setTargetTimes(partitionsToQuery);
+                        }

Review comment:
       Oh... I see. So you are saying that if we have two specs for a given leader, say one with MAX_TIMESTAMP and another one with EARLIEST_TIMESTAMP, we send two separate requests to that leader, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640544656



##########
File path: core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
+
+  val topicName = "foo"
+  var adminClient: Admin = null
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    super.setUp()
+    createTopic(topicName,1,1.asInstanceOf[Short])
+    produceMessages()
+    adminClient = Admin.create(Map[String, Object](
+      AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
+    ).asJava)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
+    super.tearDown()
+  }
+
+  @Test
+  def testEarliestOffset(): Unit = {
+    val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
+    assertEquals(0,earliestOffset.offset())
+  }
+
+  @Test
+  def testLatestOffset(): Unit = {
+    val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
+    assertEquals(3,latestOffset.offset())
+  }
+
+  @Test
+  def testMaxTimestampOffset(): Unit = {
+    val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp())
+    assertEquals(1,maxTimestampOffset.offset())
+  }
+
+  private def runFetchOffsets(adminClient: Admin,
+                              offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = {
+    println(s"==> listOffsets(${topicName} -> ${offsetSpec}, new ListOffsetsOptions())")

Review comment:
       I used ReassignPartitionsIntegrationTest as a base for creating this and this has similar messages, I can remove if needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-847883470


   I will review it in the next few days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657004623



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,39 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures and remove partitions from the downgraded retry
+                        List<ListOffsetsTopic> topicsToRemove = new ArrayList<>();
+                        partitionsToQuery.stream().forEach(
+                            t -> {
+                                List<ListOffsetsPartition> partitionsToRemove = new ArrayList<>();
+                                t.partitions().stream()
+                                    .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)
+                                    .forEach(
+                                        p -> {
+                                            futures.get(new TopicPartition(t.name(), p.partitionIndex()))
+                                                .completeExceptionally(
+                                                    new UnsupportedVersionException(
+                                                        "Broker " + brokerId
+                                                            + " does not support MAX_TIMESTAMP offset spec"));
+                                            partitionsToRemove.add(p);
+
+                                        });
+                                t.partitions().removeAll(partitionsToRemove);
+                                if (t.partitions().isEmpty()) topicsToRemove.add(t);
+                            }
+                        );
+                        partitionsToQuery.removeAll(topicsToRemove);

Review comment:
       agreed, looks much neater.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #10760: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)

Posted by GitBox <gi...@apache.org>.
dajac commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-869772020


   We have found the issue. @thomaskwscott is working on the fix. My bad, I missed one case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r658632196



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,42 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+
+                    // if no max timestamp requests were submitted we should not retry
+                    if (partitionsToQuery.stream()
+                        .flatMap(t -> t.partitions().stream())
+                        .noneMatch(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP))
+                        return false;

Review comment:
       updated with the flag




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #10760: KAFKA-12541; Extend ListOffset to fetch offset with max timestamp (KIP-734)

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-869730993






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657051339



##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -93,16 +87,52 @@ class LogOffsetTest extends BaseRequestTest {
   }
 
   @Test
-  def testGetOffsetsBeforeLatestTime(): Unit = {
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, 0)
+    val log = createTopicAndGetLog(topic, topicPartition)
 
-    createTopic(topic, 1, 1)
+    for (timestamp <- 0 until 20)
+      log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0)
+    log.flush()
 
-    val logManager = server.getLogManager
-    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
-      s"Log for partition $topicPartition should be created")
-    val log = logManager.getLog(topicPartition).get
+    log.updateHighWatermark(log.logEndOffset)
+
+    val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(19L, firstOffset.get.offset)
+    assertEquals(19L, firstOffset.get.timestamp)
+
+    log.truncateTo(0)
+
+    val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(0L, secondOffset.get.offset)
+    assertEquals(-1L, secondOffset.get.timestamp)
+

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r656955014



##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543);
+            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class);
+
+            ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get();
+            assertEquals(345L, tp1Offset.offset());
+            assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+            assertEquals(-1L, tp1Offset.timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampAndNoBrokerResponse() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);

Review comment:
       This was to verify the old logic around failing the futures in handleResponse. I have removed it as it is no longer needed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r656222177



##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -93,16 +87,52 @@ class LogOffsetTest extends BaseRequestTest {
   }
 
   @Test
-  def testGetOffsetsBeforeLatestTime(): Unit = {
+  def testFetchOffsetByTimestampForMaxTimestampAfterTruncate(): Unit = {
     val topic = "kafka-"
     val topicPartition = new TopicPartition(topic, 0)
+    val log = createTopicAndGetLog(topic, topicPartition)
 
-    createTopic(topic, 1, 1)
+    for (timestamp <- 0 until 20)
+      log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes(), timestamp = timestamp.toLong), leaderEpoch = 0)
+    log.flush()
 
-    val logManager = server.getLogManager
-    TestUtils.waitUntilTrue(() => logManager.getLog(topicPartition).isDefined,
-      s"Log for partition $topicPartition should be created")
-    val log = logManager.getLog(topicPartition).get
+    log.updateHighWatermark(log.logEndOffset)
+
+    val firstOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(19L, firstOffset.get.offset)
+    assertEquals(19L, firstOffset.get.timestamp)
+
+    log.truncateTo(0)
+
+    val secondOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(0L, secondOffset.get.offset)
+    assertEquals(-1L, secondOffset.get.timestamp)
+

Review comment:
       nit: Empty line could be removed.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -302,10 +302,16 @@ public void testSerialization() throws Exception {
         checkErrorResponse(createDeleteGroupsRequest(), unknownServerException, true);
         checkResponse(createDeleteGroupsResponse(), 0, true);
         for (short version : LIST_OFFSETS.allVersions()) {
-            checkRequest(createListOffsetRequest(version), true);
-            checkErrorResponse(createListOffsetRequest(version), unknownServerException, true);
+            checkRequest(createListOffsetRequest(version, 1000000L), true);
+            checkErrorResponse(createListOffsetRequest(version, 1000000L), unknownServerException, true);
             checkResponse(createListOffsetResponse(version), version, true);
         }
+        LIST_OFFSETS.allVersions().stream().filter(version -> version >= (short) 7).forEach(
+            version -> {
+                checkRequest(createListOffsetRequest(version, ListOffsetsRequest.MAX_TIMESTAMP), true);
+                checkErrorResponse(createListOffsetRequest(version, ListOffsetsRequest.MAX_TIMESTAMP), unknownServerException, true);
+            }
+        );

Review comment:
       This does not bring much in this test suite. I think that we can remove it. The only important point is to test all the versions here.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
##########
@@ -54,9 +55,11 @@ public static Builder forReplica(short allowedVersion, int replicaId) {
             return new Builder((short) 0, allowedVersion, replicaId, IsolationLevel.READ_UNCOMMITTED);
         }
 
-        public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel) {
+        public static Builder forConsumer(boolean requireTimestamp, IsolationLevel isolationLevel, boolean requireMaxTimestamp) {

Review comment:
       Should we add a small unit test for this change in `clients/src/test/java/org/apache/kafka/common/requests/ListOffsetsRequestTest.java`?

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -1444,43 +1450,63 @@ private DeleteGroupsResponse createDeleteGroupsResponse() {
         );
     }
 
-    private ListOffsetsRequest createListOffsetRequest(int version) {
+    private ListOffsetsRequest createListOffsetRequest(int version, long timestamp) {
         if (version == 0) {
             ListOffsetsTopic topic = new ListOffsetsTopic()
                     .setName("test")
                     .setPartitions(Arrays.asList(new ListOffsetsPartition()
                             .setPartitionIndex(0)
-                            .setTimestamp(1000000L)
+                            .setTimestamp(timestamp)
                             .setMaxNumOffsets(10)
                             .setCurrentLeaderEpoch(5)));
             return ListOffsetsRequest.Builder
-                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+                    .forConsumer(false, IsolationLevel.READ_UNCOMMITTED, false)
                     .setTargetTimes(Collections.singletonList(topic))
                     .build((short) version);
         } else if (version == 1) {
             ListOffsetsTopic topic = new ListOffsetsTopic()
                     .setName("test")
                     .setPartitions(Arrays.asList(new ListOffsetsPartition()
                             .setPartitionIndex(0)
-                            .setTimestamp(1000000L)
+                            .setTimestamp(timestamp)
                             .setCurrentLeaderEpoch(5)));
             return ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
+                    .forConsumer(true, IsolationLevel.READ_UNCOMMITTED, false)
                     .setTargetTimes(Collections.singletonList(topic))
                     .build((short) version);
-        } else if (version >= 2 && version <= LIST_OFFSETS.latestVersion()) {
+        } else if (version >= 2 && version <= 6) {
             ListOffsetsPartition partition = new ListOffsetsPartition()
                     .setPartitionIndex(0)
-                    .setTimestamp(1000000L)
+                    .setTimestamp(timestamp)
                     .setCurrentLeaderEpoch(5);
 
             ListOffsetsTopic topic = new ListOffsetsTopic()
                     .setName("test")
                     .setPartitions(Arrays.asList(partition));
             return ListOffsetsRequest.Builder
-                    .forConsumer(true, IsolationLevel.READ_COMMITTED)
+                    .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
                     .setTargetTimes(Collections.singletonList(topic))
                     .build((short) version);
+        } else if (version >= 7 && version <= LIST_OFFSETS.latestVersion()) {
+            ListOffsetsPartition partition = new ListOffsetsPartition()
+                .setPartitionIndex(0)
+                .setTimestamp(timestamp)
+                .setCurrentLeaderEpoch(5);
+
+            ListOffsetsTopic topic = new ListOffsetsTopic()
+                .setName("test")
+                .setPartitions(Arrays.asList(partition));
+            if (timestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+                return ListOffsetsRequest.Builder
+                        .forConsumer(true, IsolationLevel.READ_COMMITTED, false)
+                        .setTargetTimes(Collections.singletonList(topic))
+                        .build((short) version);
+            } else {

Review comment:
       We could also remove this. Note that `.forConsumer(true, IsolationLevel.READ_COMMITTED, false)` is used here. The last argument should have been `true`. It shows that it does not really bring any value.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsTopicResponse topicResponse = ListOffsetsResponse.singletonListOffsetsTopicResponse(tp1, Errors.NONE, -1L, 345L, 543);
+            ListOffsetsResponseData responseData = new ListOffsetsResponseData()
+                .setThrottleTimeMs(0)
+                .setTopics(Arrays.asList(topicResponse));
+            env.kafkaClient().prepareResponseFrom(new ListOffsetsResponse(responseData), node);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(new HashMap<TopicPartition, OffsetSpec>() {{
+                    put(tp0, OffsetSpec.maxTimestamp());
+                    put(tp1, OffsetSpec.latest());
+                }});
+
+            TestUtils.assertFutureThrows(result.partitionResult(tp0), UnsupportedVersionException.class);
+
+            ListOffsetsResultInfo tp1Offset = result.partitionResult(tp1).get();
+            assertEquals(345L, tp1Offset.offset());
+            assertEquals(543, tp1Offset.leaderEpoch().get().intValue());
+            assertEquals(-1L, tp1Offset.timestamp());
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampAndNoBrokerResponse() {
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);

Review comment:
       This does not really align with the name of the test. Is it intentional to have it? The test basically verifies that the first request is failed with an `UnsupportedVersionResponse` error and that the second request does not get any response.

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,39 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures and remove partitions from the downgraded retry
+                        List<ListOffsetsTopic> topicsToRemove = new ArrayList<>();
+                        partitionsToQuery.stream().forEach(
+                            t -> {
+                                List<ListOffsetsPartition> partitionsToRemove = new ArrayList<>();
+                                t.partitions().stream()
+                                    .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)
+                                    .forEach(
+                                        p -> {
+                                            futures.get(new TopicPartition(t.name(), p.partitionIndex()))
+                                                .completeExceptionally(
+                                                    new UnsupportedVersionException(
+                                                        "Broker " + brokerId
+                                                            + " does not support MAX_TIMESTAMP offset spec"));
+                                            partitionsToRemove.add(p);
+
+                                        });
+                                t.partitions().removeAll(partitionsToRemove);
+                                if (t.partitions().isEmpty()) topicsToRemove.add(t);
+                            }
+                        );
+                        partitionsToQuery.removeAll(topicsToRemove);

Review comment:
       Did you consider using iterators here? This would allow us to remove elements while iterating over the collections. It seems to me that this could make the code a bit more understandable.
   
   I am thinking about something like this:
   ```
   // fail any unsupported futures and remove partitions from the downgraded retry
   Iterator<ListOffsetsTopic> topicIterator = partitionsToQuery.iterator();
   while (topicIterator.hasNext()) {
       ListOffsetsTopic topic = topicIterator.next();
       Iterator<ListOffsetsPartition> partitionIterator = topic.partitions().iterator();
       while (partitionIterator.hasNext()) {
           ListOffsetsPartition partition = partitionIterator.next();
           if (partition.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP) {
               futures.get(new TopicPartition(topic.name(), partition.partitionIndex()))
                   .completeExceptionally(new UnsupportedVersionException(
                       "Broker " + brokerId + " does not support MAX_TIMESTAMP offset spec"));
               partitionIterator.remove();
           }
       }
       if (topic.partitions().isEmpty()) {
           topicIterator.remove();
       }
   }
   ```
   
   What do you think?

##########
File path: core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
##########
@@ -94,7 +94,8 @@ class ReplicaFetcherThread(name: String,
 
   // Visible for testing
   private[server] val listOffsetRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_8_IV0) 6
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_3_0_IV0) 7

Review comment:
       We should use `KAFKA_3_0_IV1` here.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -432,8 +438,8 @@ public void testSerialization() throws Exception {
         checkRequest(createUpdateMetadataRequest(5, null), false);
         checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), unknownServerException, true);
         checkResponse(createUpdateMetadataResponse(), 0, true);
-        checkRequest(createListOffsetRequest(0), true);
-        checkErrorResponse(createListOffsetRequest(0), unknownServerException, true);
+        checkRequest(createListOffsetRequest(0, 1000000L), true);
+        checkErrorResponse(createListOffsetRequest(0, 1000000L), unknownServerException, true);

Review comment:
       Is this redundant with what is already tested above?

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        List<PartitionInfo> pInfos = new ArrayList<>();
+        pInfos.add(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node}));
+        pInfos.add(new PartitionInfo("foo", 1, node, new Node[]{node}, new Node[]{node}));
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            pInfos,
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+        final TopicPartition tp1 = new TopicPartition("foo", 1);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster,
+            AdminClientConfig.RETRIES_CONFIG, "2")) {
+
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);

Review comment:
       Could we verify that the request contains only the specs which must be retried?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4298,6 +4296,39 @@ void handleFailure(Throwable throwable) {
                         }
                     }
                 }
+
+                @Override
+                boolean handleUnsupportedVersionException(UnsupportedVersionException exception) {
+                    if (supportsMaxTimestamp) {
+                        supportsMaxTimestamp = false;
+
+                        // fail any unsupported futures and remove partitions from the downgraded retry
+                        List<ListOffsetsTopic> topicsToRemove = new ArrayList<>();
+                        partitionsToQuery.stream().forEach(
+                            t -> {
+                                List<ListOffsetsPartition> partitionsToRemove = new ArrayList<>();
+                                t.partitions().stream()
+                                    .filter(p -> p.timestamp() == ListOffsetsRequest.MAX_TIMESTAMP)
+                                    .forEach(
+                                        p -> {
+                                            futures.get(new TopicPartition(t.name(), p.partitionIndex()))
+                                                .completeExceptionally(
+                                                    new UnsupportedVersionException(
+                                                        "Broker " + brokerId
+                                                            + " does not support MAX_TIMESTAMP offset spec"));
+                                            partitionsToRemove.add(p);
+
+                                        });
+                                t.partitions().removeAll(partitionsToRemove);
+                                if (t.partitions().isEmpty()) topicsToRemove.add(t);
+                            }
+                        );
+                        partitionsToQuery.removeAll(topicsToRemove);
+
+                        return !partitionsToQuery.isEmpty();

Review comment:
       Don't we need to not retry if they were no `MAX_TIMESTAMP` in the collection? In this case, it would mean that the `UnsupportedVersionException` is not retryable. This could happen if the admin client talk to a really old broker which does not support v1 for instance.

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -149,6 +179,21 @@ class LogOffsetTest extends BaseRequestTest {
     assertFalse(offsetChanged)
   }
 
+  @Test
+  def testFetchOffsetByTimestampForMaxTimestampWithEmptyLog(): Unit = {
+    val topic = "kafka-"
+    val topicPartition = new TopicPartition(topic, 0)
+    val log = createTopicAndGetLog(topic, topicPartition)
+
+    log.updateHighWatermark(log.logEndOffset)
+
+    val maxTimestampOffset = log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP)
+    assertEquals(0L, log.logEndOffset)
+    assertEquals(0L, maxTimestampOffset.get.offset)
+    assertEquals(-1L, maxTimestampOffset.get.timestamp)
+

Review comment:
       nit: Empty line could be removed.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+
+        Node node = new Node(0, "localhost", 8120);
+        List<Node> nodes = Collections.singletonList(node);
+        final Cluster cluster = new Cluster(
+            "mockClusterId",
+            nodes,
+            Collections.singleton(new PartitionInfo("foo", 0, node, new Node[]{node}, new Node[]{node})),
+            Collections.emptySet(),
+            Collections.emptySet(),
+            node);
+        final TopicPartition tp0 = new TopicPartition("foo", 0);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster, AdminClientConfig.RETRIES_CONFIG, "2")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponse(prepareMetadataResponse(cluster, Errors.NONE));
+
+            // listoffsets response from broker 0
+            env.kafkaClient().prepareUnsupportedVersionResponse(
+                request -> request instanceof ListOffsetsRequest);
+
+            ListOffsetsResult result = env.adminClient().listOffsets(Collections.singletonMap(tp0, OffsetSpec.maxTimestamp()));
+
+            TestUtils.assertFutureThrows(result.all(), UnsupportedVersionException.class);
+        }
+    }
+
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedMultipleOffsetSpec() throws Exception {
+

Review comment:
       nit: Empty line could be removed.

##########
File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -4226,6 +4235,124 @@ public void testListOffsetsNonRetriableErrors() throws Exception {
         }
     }
 
+    @Test
+    public void testListOffsetsMaxTimestampUnsupportedSingleOffsetSpec() {
+

Review comment:
       nit: Empty line could be removed.

##########
File path: clients/src/main/resources/common/message/ListOffsetsRequest.json
##########
@@ -30,7 +30,9 @@
   // Version 5 is the same as version 4.
   //
   // Version 6 enables flexible versions.
-  "validVersions": "0-6",
+  //
+  // Version 7 enables listing offsets by max timestamp.

Review comment:
       Could we add the KIP as well?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File,
         val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
         val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
         Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional))
+      } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+        // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
+        // constant time access while being safe to use with concurrent collections unlike `toArray`.
+        val segmentsCopy = logSegments.toBuffer
+        val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
+        val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
+        val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
+        Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar,
+          latestTimestampSegment.offsetOfMaxTimestampSoFar,
+          epochOptional))

Review comment:
       Could we file a JIRA as a subtask in the Jira of the KIP to not forget about it?

##########
File path: clients/src/main/resources/common/message/ListOffsetsResponse.json
##########
@@ -29,7 +29,9 @@
   // Version 5 adds a new error code, OFFSET_NOT_AVAILABLE.
   //
   // Version 6 enables flexible versions.
-  "validVersions": "0-6",
+  //
+  // Version 7 is the same as version 6.

Review comment:
       Could we add the KIP as well?

##########
File path: core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
##########
@@ -266,4 +311,14 @@ class LogOffsetTest extends BaseRequestTest {
       .partitions.asScala.find(_.partitionIndex == tp.partition).get
   }
 
+  private def createTopicAndGetLog(topic: String, topicPartition: TopicPartition): Log = {
+

Review comment:
       nit: Empty line could be removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] thomaskwscott commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
thomaskwscott commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r657052063



##########
File path: clients/src/main/resources/common/message/ListOffsetsResponse.json
##########
@@ -29,7 +29,9 @@
   // Version 5 adds a new error code, OFFSET_NOT_AVAILABLE.
   //
   // Version 6 enables flexible versions.
-  "validVersions": "0-6",
+  //
+  // Version 7 is the same as version 6.

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#discussion_r640459341



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1316,6 +1316,16 @@ class Log(@volatile private var _dir: File,
         val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
         val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
         Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logEndOffset, epochOptional))
+      } else if (targetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
+        // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
+        // constant time access while being safe to use with concurrent collections unlike `toArray`.
+        val segmentsCopy = logSegments.toBuffer
+        val latestTimestampSegment = segmentsCopy.maxBy(_.maxTimestampSoFar)
+        val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
+        val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
+        Some(new TimestampAndOffset(latestTimestampSegment.maxTimestampSoFar,
+          latestTimestampSegment.offsetOfMaxTimestampSoFar,
+          epochOptional))

Review comment:
       Could we get a `maxTimestampSoFar` and `offsetOfMaxTimestampSoFar` which does not correspond to each others? It seems that we have no guarantee here. Is it an issue? 

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4225,76 +4232,84 @@ public ListOffsetsResult listOffsets(Map<TopicPartition, OffsetSpec> topicPartit
             }
         }
 
-        for (final Map.Entry<Node, Map<String, ListOffsetsTopic>> entry : leaders.entrySet()) {
-            final int brokerId = entry.getKey().id();
+        for (final Map.Entry<Node, Map<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>>> versionedEntry : leaders.entrySet()) {
+            for (final Map.Entry<ListOffsetRequestVersion, Map<String, ListOffsetsTopic>> entry : versionedEntry.getValue().entrySet()) {
+                final int brokerId = versionedEntry.getKey().id();
 
-            calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
+                calls.add(new Call("listOffsets on broker " + brokerId, context.deadline(), new ConstantNodeIdProvider(brokerId)) {
 
-                final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
+                    final List<ListOffsetsTopic> partitionsToQuery = new ArrayList<>(entry.getValue().values());
 
-                @Override
-                ListOffsetsRequest.Builder createRequest(int timeoutMs) {
-                    return ListOffsetsRequest.Builder
+                    @Override
+                    ListOffsetsRequest.Builder createRequest(int timeoutMs) {
+                        ListOffsetRequestVersion requestVersion = entry.getKey();
+                        if (requestVersion == ListOffsetRequestVersion.V7AndAbove) {
+                            return ListOffsetsRequest.Builder
+                                .forMaxTimestamp(context.options().isolationLevel())
+                                .setTargetTimes(partitionsToQuery);
+                        }

Review comment:
       I'd like to better understand how we handle a broker which would not support the version that we need. 
   
   `ListOffsetsRequest.Builder.forMaxTimestamp` constrains the version to 7 and above when we have have at least one max timestamp spec. It the broker does not support version 7, the request is failed with an `UnsupportedVersionException` and we fail all the future of the brokers with it in `handleFailure`.
   
   Now, let's imagine a case where the user does not only include "max timestamp specs" in his request. At the moment, we fail all of them irrespectively of their type. I wonder if we should retry to other specs in this particular case. Have we considered doing this? 

##########
File path: core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
+
+  val topicName = "foo"
+  var adminClient: Admin = null
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    super.setUp()
+    createTopic(topicName,1,1.asInstanceOf[Short])

Review comment:
       nit: We usually put a space after each coma. There are other cases in the file.

##########
File path: core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
##########
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package integration.kafka.admin
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class ListOffsetsIntegrationTest extends KafkaServerTestHarness {
+
+  val topicName = "foo"
+  var adminClient: Admin = null
+
+  @BeforeEach
+  override def setUp(): Unit = {
+    super.setUp()
+    createTopic(topicName,1,1.asInstanceOf[Short])
+    produceMessages()
+    adminClient = Admin.create(Map[String, Object](
+      AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> brokerList
+    ).asJava)
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    Utils.closeQuietly(adminClient, "ListOffsetsAdminClient")
+    super.tearDown()
+  }
+
+  @Test
+  def testEarliestOffset(): Unit = {
+    val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
+    assertEquals(0,earliestOffset.offset())
+  }
+
+  @Test
+  def testLatestOffset(): Unit = {
+    val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
+    assertEquals(3,latestOffset.offset())
+  }
+
+  @Test
+  def testMaxTimestampOffset(): Unit = {
+    val maxTimestampOffset = runFetchOffsets(adminClient, OffsetSpec.maxTimestamp())
+    assertEquals(1,maxTimestampOffset.offset())
+  }
+
+  private def runFetchOffsets(adminClient: Admin,
+                              offsetSpec: OffsetSpec): ListOffsetsResult.ListOffsetsResultInfo = {
+    println(s"==> listOffsets(${topicName} -> ${offsetSpec}, new ListOffsetsOptions())")

Review comment:
       nit: Should we remove this?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org