You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/06/12 23:30:10 UTC
[kafka] branch trunk updated: KAFKA-6979;
Add `default.api.timeout.ms` to KafkaConsumer (KIP-266) (#5122)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 53ca52f KAFKA-6979; Add `default.api.timeout.ms` to KafkaConsumer (KIP-266) (#5122)
53ca52f is described below
commit 53ca52f855e903907378188d29224b3f9cefa6cb
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Tue Jun 12 16:29:50 2018 -0700
KAFKA-6979; Add `default.api.timeout.ms` to KafkaConsumer (KIP-266) (#5122)
Adds a configuration that specifies the default timeout for KafkaConsumer APIs that could block. This was introduced in KIP-266.
Reviewers: Satish Duggana <sa...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
.../kafka/clients/consumer/ConsumerConfig.java | 10 +++++
.../kafka/clients/consumer/KafkaConsumer.java | 52 +++++++++++++++-------
.../kafka/clients/consumer/KafkaConsumerTest.java | 2 +
docs/upgrade.html | 11 +++--
4 files changed, 54 insertions(+), 21 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 72e496c..bc9a716 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -218,6 +218,10 @@ public class ConsumerConfig extends AbstractConfig {
public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+ /** <code>default.api.timeout.ms</code> */
+ public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";
+ public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for consumer APIs that could block. This configuration is used as the default timeout for all consumer operations that do not explicitly accept a <code>timeout</code> parameter.";
+
/** <code>interceptor.classes</code> */
public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "
@@ -403,6 +407,12 @@ public class ConsumerConfig extends AbstractConfig {
atLeast(0),
Importance.MEDIUM,
REQUEST_TIMEOUT_MS_DOC)
+ .define(DEFAULT_API_TIMEOUT_MS_CONFIG,
+ Type.INT,
+ 60 * 1000,
+ atLeast(0),
+ Importance.MEDIUM,
+ DEFAULT_API_TIMEOUT_MS_DOC)
/* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
.define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
Type.LONG,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5bd6b93..d6973c0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -567,6 +567,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Metadata metadata;
private final long retryBackoffMs;
private final long requestTimeoutMs;
+ private final int defaultApiTimeoutMs;
private volatile boolean closed = false;
private List<PartitionAssignor> assignors;
@@ -666,6 +667,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+ this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
@@ -814,6 +816,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
Metadata metadata,
long retryBackoffMs,
long requestTimeoutMs,
+ int defaultApiTimeoutMs,
List<PartitionAssignor> assignors) {
this.log = logContext.logger(getClass());
this.clientId = clientId;
@@ -829,6 +832,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.metadata = metadata;
this.retryBackoffMs = retryBackoffMs;
this.requestTimeoutMs = requestTimeoutMs;
+ this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.assignors = assignors;
}
@@ -1268,8 +1272,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used.
* <p>
- * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
- * encountered (in which case it is thrown to the caller).
+ * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
+ * encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} expires
+ * (in which case a {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
* <p>
* Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
* (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
@@ -1286,10 +1291,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
* is too large or if the topic does not exist).
+ * @throws org.apache.kafka.common.errors.TimeoutException if the timeout specified by {@code default.api.timeout.ms} expires
+ * before successful completion of the offset commit
*/
@Override
public void commitSync() {
- commitSync(Duration.ofMillis(Long.MAX_VALUE));
+ commitSync(Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@@ -1343,7 +1350,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* i.e. lastProcessedMessageOffset + 1.
* <p>
* This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
- * encountered (in which case it is thrown to the caller).
+ * encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} expires
+ * (in which case a {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
* <p>
* Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
* (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
@@ -1362,10 +1370,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws java.lang.IllegalArgumentException if the committed offset is negative
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
* is too large or if the topic does not exist).
+ * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
+ * of the offset commit
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
- commitSync(offsets, Duration.ofMillis(Long.MAX_VALUE));
+ commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@@ -1560,7 +1570,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* This method may issue a remote call to the server if there is no current position for the given partition.
* <p>
* This call will block until either the position could be determined or an unrecoverable error is
- * encountered (in which case it is thrown to the caller).
+ * encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} expires
+ * (in which case a {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
*
* @param partition The partition to get the position for
* @return The current position of the consumer (that is, the offset of the next record to be fetched)
@@ -1575,10 +1586,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+ * @throws org.apache.kafka.common.errors.TimeoutException if the position cannot be determined before the
+ * timeout specified by {@code default.api.timeout.ms} expires
*/
@Override
public long position(TopicPartition partition) {
- return position(partition, Duration.ofMillis(Long.MAX_VALUE));
+ return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@@ -1641,7 +1654,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* Get the last committed offset for the given partition (whether the commit happened by this process or
* another). This offset will be used as the position for the consumer in the event of a failure.
* <p>
- * This call will block to do a remote call to get the latest committed offsets from the server.
+ * This call will do a remote call to get the latest committed offset from the server, and will block until the
+ * committed offset is gotten successfully, an unrecoverable error is encountered (in which case it is thrown to
+ * the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a
+ * {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
*
* @param partition The partition to check
* @return The last committed offset and metadata or null if there was no prior commit
@@ -1653,10 +1669,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+ * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
+ * the timeout specified by {@code default.api.timeout.ms} expires.
*/
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
- return committed(partition, Duration.ofMillis(Long.MAX_VALUE));
+ return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@@ -1718,11 +1736,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
- * the amount of time allocated by {@code request.timeout.ms} expires.
+ * the amount of time allocated by {@code default.api.timeout.ms} expires.
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
- return partitionsFor(topic, Duration.ofMillis(requestTimeoutMs));
+ return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@@ -1774,11 +1792,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* this function is called
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
- * the amount of time allocated by {@code request.timeout.ms} expires.
+ * the amount of time allocated by {@code default.api.timeout.ms} expires.
*/
@Override
public Map<String, List<PartitionInfo>> listTopics() {
- return listTopics(Duration.ofMillis(requestTimeoutMs));
+ return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@@ -1879,13 +1897,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws IllegalArgumentException if the target timestamp is negative
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
- * the amount of time allocated by {@code request.timeout.ms} expires.
+ * the amount of time allocated by {@code default.api.timeout.ms} expires.
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up
* the offsets by timestamp
*/
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
- return offsetsForTimes(timestampsToSearch, Duration.ofMillis(requestTimeoutMs));
+ return offsetsForTimes(timestampsToSearch, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
@@ -1939,11 +1957,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
- * expiration of the configured {@code request.timeout.ms}
+ * expiration of the configured {@code default.api.timeout.ms}
*/
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
- return beginningOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+ return beginningOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 4be6884..b8681e8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1748,6 +1748,7 @@ public class KafkaConsumerTest {
String metricGroupPrefix = "consumer";
long retryBackoffMs = 100;
long requestTimeoutMs = 30000;
+ int defaultApiTimeoutMs = 30000;
boolean excludeInternalTopics = true;
int minBytes = 1;
int maxBytes = Integer.MAX_VALUE;
@@ -1825,6 +1826,7 @@ public class KafkaConsumerTest {
metadata,
retryBackoffMs,
requestTimeoutMs,
+ defaultApiTimeoutMs,
assignors);
}
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 3c75bad..7061d6c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -98,10 +98,13 @@
<code>internal.value.converter=org.apache.kafka.connect.json.JsonConverter</code>
<code>internal.value.converter.schemas.enable=false</code>
</li>
- <li><a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> adds overloads to the consumer to support
- timeout behavior for blocking APIs. In particular, a new <code>poll(Duration)</code> API has been added which
- does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and
- will be removed in a future version.</li>
+ <li><a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> adds a new consumer configuration <code>default.api.timeout.ms</code>
+ to specify the default timeout to use for <code>KafkaConsumer</code> APIs that could block. The KIP also adds overloads for such blocking
+ APIs to support specifying a specific timeout to use for each of them instead of using the default timeout set by <code>default.api.timeout.ms</code>.
+ In particular, a new <code>poll(Duration)</code> API has been added which does not block for dynamic partition assignment.
+ The old <code>poll(long)</code> API has been deprecated and will be removed in a future version. Overloads have also been added
+ for other <code>KafkaConsumer</code> methods like <code>partitionsFor</code>, <code>listTopics</code>, <code>offsetsForTimes</code>,
+ <code>beginningOffsets</code>, <code>endOffsets</code> and <code>close</code> that take in a <code>Duration</code>.</li>
<li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
<li>The tool kafka.tools.ReplayLogProducer has been removed.</li>
<li>The AclCommand tool <code>--producer</code> convenience option uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a> finer grained ACL on the given topic. </li>
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.