You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2022/02/03 09:36:50 UTC
[kafka] branch 3.1 updated: KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets (#11726)
This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.1 by this push:
new ae09d9c KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets (#11726)
ae09d9c is described below
commit ae09d9c32793777fa3d3fc90ce67eb504bec9b2e
Author: dengziming <de...@gmail.com>
AuthorDate: Thu Feb 3 17:32:25 2022 +0800
KAFKA-13637: Use default.api.timeout.ms as default timeout value for KafkaConsumer.endOffsets (#11726)
We introduced `default.api.timeout.ms` in https://github.com/apache/kafka/commit/53ca52f855e903907378188d29224b3f9cefa6cb but we missed updating `KafkaConsumer.endOffsets` which still use `request.timeout.ms`. This patch fixes this.
Reviewers: David Jacot <dj...@confluent.io>
---
.../kafka/clients/consumer/KafkaConsumer.java | 4 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 58 +++++++++++++++++++++-
2 files changed, 58 insertions(+), 4 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 286f84b..5e58bee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -2186,11 +2186,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
- * the amount of time allocated by {@code request.timeout.ms} expires
+ * the amount of time allocated by {@code default.api.timeout.ms} expires
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
- return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+ return endOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2872983..9b79473 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -164,6 +164,8 @@ public class KafkaConsumerTest {
private final TopicPartition t3p0 = new TopicPartition(topic3, 0);
private final int sessionTimeoutMs = 10000;
+ private final int defaultApiTimeoutMs = 60000;
+ private final int requestTimeoutMs = defaultApiTimeoutMs / 2;
private final int heartbeatIntervalMs = 1000;
// Set auto commit interval lower than heartbeat so we don't need to deal with
@@ -2618,8 +2620,6 @@ public class KafkaConsumerTest {
String clientId = "mock-consumer";
String metricGroupPrefix = "consumer";
long retryBackoffMs = 100;
- int requestTimeoutMs = 30000;
- int defaultApiTimeoutMs = 30000;
int minBytes = 1;
int maxBytes = Integer.MAX_VALUE;
int maxWaitMs = 500;
@@ -2948,6 +2948,60 @@ public class KafkaConsumerTest {
() -> new KafkaConsumer<>(configs, new StringDeserializer(), new StringDeserializer()));
}
+ @Test
+ public void testOffsetsForTimesTimeout() {
+ final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
+ assertEquals(
+ "Failed to get offsets by times in 60000ms",
+ assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.offsetsForTimes(singletonMap(tp0, 0L))).getMessage()
+ );
+ }
+
+ @Test
+ public void testBeginningOffsetsTimeout() {
+ final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
+ assertEquals(
+ "Failed to get offsets by times in 60000ms",
+ assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.beginningOffsets(singletonList(tp0))).getMessage()
+ );
+ }
+
+ @Test
+ public void testEndOffsetsTimeout() {
+ final KafkaConsumer<String, String> consumer = consumerForCheckingTimeoutException();
+ assertEquals(
+ "Failed to get offsets by times in 60000ms",
+ assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> consumer.endOffsets(singletonList(tp0))).getMessage()
+ );
+ }
+
+ private KafkaConsumer<String, String> consumerForCheckingTimeoutException() {
+ final Time time = new MockTime();
+ SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST);
+ ConsumerMetadata metadata = createMetadata(subscription);
+ MockClient client = new MockClient(time, metadata);
+
+ initMetadata(client, singletonMap(topic, 1));
+
+ ConsumerPartitionAssignor assignor = new RangeAssignor();
+
+ final KafkaConsumer<String, String> consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId);
+
+ for (int i = 0; i < 10; i++) {
+ client.prepareResponse(
+ request -> {
+ time.sleep(defaultApiTimeoutMs / 10);
+ return request instanceof ListOffsetsRequest;
+ },
+ listOffsetsResponse(
+ Collections.emptyMap(),
+ Collections.singletonMap(tp0, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+ ));
+ }
+
+ return consumer;
+ }
+
private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class DeserializerForClientId implements Deserializer<byte[]> {
@Override