You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/02/03 09:36:50 UTC

[kafka] branch 3.1 updated: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets (#11726)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new ae09d9c  KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets (#11726)
ae09d9c is described below

commit ae09d9c32793777fa3d3fc90ce67eb504bec9b2e
Author: dengziming <de...@gmail.com>
AuthorDate: Thu Feb 3 17:32:25 2022 +0800

    KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets (#11726)
    
    We introduced `default.api.timeout.ms` in https://github.com/apache/kafka/commit/53ca52f855e903907378188d29224b3f9cefa6cb but we missed updating `KafkaConsumer.endOffsets` which still use `request.timeout.ms`. This patch fixes this.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |  4 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 58 +++++++++++++++++++++-
 2 files changed, 58 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 286f84b..5e58bee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2186,11 +2186,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
-     *         the amount of time allocated by {@code request.timeout.ms} expires
+     *         the amount of time allocated by {@code default.api.timeout.ms} expires
      */
     @Override
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
-        return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+        return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2872983..9b79473 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -164,6 +164,8 @@ public class KafkaConsumerTest {
     private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
 
     private final int sessionTimeoutMs = 10000;
+    private final int defaultApiTimeoutMs = 60000;
+    private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
     private final int heartbeatIntervalMs = 1000;
 
     // Set auto commit interval lower than heartbeat so we don't need to deal with
@@ -2618,8 +2620,6 @@ public class KafkaConsumerTest {
         String clientId = "mock-consumer";
         String metricGroupPrefix = "consumer";
         long retryBackoffMs = 100;
-        int requestTimeoutMs = 30000;
-        int defaultApiTimeoutMs = 30000;
         int minBytes = 1;
         int maxBytes = Integer.MAX_VALUE;
         int maxWaitMs = 500;
@@ -2948,6 +2948,60 @@ public class KafkaConsumerTest {
             () -> new KafkaConsumer<>(configs, new StringDeserializer(), new StringDeserializer()));
     }
 
+    @Test
+    public void testOffsetsForTimesTimeout() {
+        final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
+        assertEquals(
+            "Failed to get offsets by times in 60000ms",
+            assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage()
+        );
+    }
+
+    @Test
+    public void testBeginningOffsetsTimeout() {
+        final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
+        assertEquals(
+            "Failed to get offsets by times in 60000ms",
+            assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.beginningOffsets(singletonList(tp0))).getMessage()
+        );
+    }
+
+    @Test
+    public void testEndOffsetsTimeout() {
+        final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
+        assertEquals(
+            "Failed to get offsets by times in 60000ms",
+            assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.endOffsets(singletonList(tp0))).getMessage()
+        );
+    }
+
+    private KafkaConsumer<String, String> consumerForCheckingTimeoutException() {
+        final Time time = new MockTime();
+        SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+        ConsumerMetadata metadata = createMetadata(subscription);
+        MockClient client = new MockClient(time, metadata);
+
+        initMetadata(client, singletonMap(topic, 1));
+
+        ConsumerPartitionAssignor assignor = new RangeAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+
+        for (int i = 0; i < 10; i++) {
+            client.prepareResponse(
+                request -> {
+                    time.sleep(defaultApiTimeoutMs / 10);
+                    return request instanceof ListOffsetsRequest;
+                },
+                listOffsetsResponse(
+                    Collections.emptyMap(),
+                    Collections.singletonMap(tp0, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+                ));
+        }
+
+        return consumer;
+    }
+
     private static final List<String> CLIENT_IDS = new ArrayList<>();
     public static class DeserializerForClientId implements Deserializer<byte[]> {
         @Override