You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/06/13 00:56:57 UTC

[kafka] branch 2.0 updated: KAFKA-6979; Add `default.api.timeout.ms` to KafkaConsumer (KIP-266) (#5122)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new b543c74  KAFKA-6979; Add `default.api.timeout.ms` to KafkaConsumer (KIP-266) (#5122)
b543c74 is described below

commit b543c74e6a0a86b61a25a2c39ac853084552053c
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Tue Jun 12 16:29:50 2018 -0700

    KAFKA-6979; Add `default.api.timeout.ms` to KafkaConsumer (KIP-266) (#5122)
    
    Adds a configuration that specifies the default timeout for KafkaConsumer APIs that could block. This was introduced in KIP-266.
    
    Reviewers: Satish Duggana <sa...@gmail.com>, Jason Gustafson <ja...@confluent.io>
---
 .../kafka/clients/consumer/ConsumerConfig.java     | 10 +++++
 .../kafka/clients/consumer/KafkaConsumer.java      | 52 +++++++++++++++-------
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  2 +
 docs/upgrade.html                                  | 11 +++--
 4 files changed, 54 insertions(+), 21 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 72e496c..bc9a716 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -218,6 +218,10 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
     private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
 
+    /** <code>default.api.timeout.ms</code> */
+    public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = "default.api.timeout.ms";
+    public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the timeout (in milliseconds) for consumer APIs that could block. This configuration is used as the default timeout for all consumer operations that do not explicitly accept a <code>timeout</code> parameter.";
+
     /** <code>interceptor.classes</code> */
     public static final String INTERCEPTOR_CLASSES_CONFIG = "interceptor.classes";
     public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to use as interceptors. "
@@ -403,6 +407,12 @@ public class ConsumerConfig extends AbstractConfig {
                                         atLeast(0),
                                         Importance.MEDIUM,
                                         REQUEST_TIMEOUT_MS_DOC)
+                                .define(DEFAULT_API_TIMEOUT_MS_CONFIG,
+                                        Type.INT,
+                                        60 * 1000,
+                                        atLeast(0),
+                                        Importance.MEDIUM,
+                                        DEFAULT_API_TIMEOUT_MS_DOC)
                                 /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
                                 .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
                                         Type.LONG,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 5bd6b93..d6973c0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -567,6 +567,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final Metadata metadata;
     private final long retryBackoffMs;
     private final long requestTimeoutMs;
+    private final int defaultApiTimeoutMs;
     private volatile boolean closed = false;
     private List<PartitionAssignor> assignors;
 
@@ -666,6 +667,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
             log.debug("Initializing the Kafka consumer");
             this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+            this.defaultApiTimeoutMs = config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
             int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
             int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
             if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
@@ -814,6 +816,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                   Metadata metadata,
                   long retryBackoffMs,
                   long requestTimeoutMs,
+                  int defaultApiTimeoutMs,
                   List<PartitionAssignor> assignors) {
         this.log = logContext.logger(getClass());
         this.clientId = clientId;
@@ -829,6 +832,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         this.metadata = metadata;
         this.retryBackoffMs = retryBackoffMs;
         this.requestTimeoutMs = requestTimeoutMs;
+        this.defaultApiTimeoutMs = defaultApiTimeoutMs;
         this.assignors = assignors;
     }
 
@@ -1268,8 +1272,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
      * should not be used.
      * <p>
-     * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
-     * encountered (in which case it is thrown to the caller).
+     * This is a synchronous commit and will block until either the commit succeeds, an unrecoverable error is
+     * encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} expires
+     * (in which case a {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
      * <p>
      * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
      * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
@@ -1286,10 +1291,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
      *             is too large or if the topic does not exist).
+     * @throws org.apache.kafka.common.errors.TimeoutException if the timeout specified by {@code default.api.timeout.ms} expires
+     *            before successful completion of the offset commit
      */
     @Override
     public void commitSync() {
-        commitSync(Duration.ofMillis(Long.MAX_VALUE));
+        commitSync(Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1343,7 +1350,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * i.e. lastProcessedMessageOffset + 1.
      * <p>
      * This is a synchronous commits and will block until either the commit succeeds or an unrecoverable error is
-     * encountered (in which case it is thrown to the caller).
+     * encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} expires
+     * (in which case a {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
      * <p>
      * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
      * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
@@ -1362,10 +1370,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws java.lang.IllegalArgumentException if the committed offset is negative
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
      *             is too large or if the topic does not exist).
+     * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
+     *            of the offset commit
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
-        commitSync(offsets, Duration.ofMillis(Long.MAX_VALUE));
+        commitSync(offsets, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1560,7 +1570,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * This method may issue a remote call to the server if there is no current position for the given partition.
      * <p>
      * This call will block until either the position could be determined or an unrecoverable error is
-     * encountered (in which case it is thrown to the caller).
+     * encountered (in which case it is thrown to the caller), or the timeout specified by {@code default.api.timeout.ms} expires
+     * (in which case a {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
      *
      * @param partition The partition to get the position for
      * @return The current position of the consumer (that is, the offset of the next record to be fetched)
@@ -1575,10 +1586,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
      *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the position cannot be determined before the
+     *             timeout specified by {@code default.api.timeout.ms} expires
      */
     @Override
     public long position(TopicPartition partition) {
-        return position(partition, Duration.ofMillis(Long.MAX_VALUE));
+        return position(partition, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1641,7 +1654,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * Get the last committed offset for the given partition (whether the commit happened by this process or
      * another). This offset will be used as the position for the consumer in the event of a failure.
      * <p>
-     * This call will block to do a remote call to get the latest committed offsets from the server.
+     * This call will do a remote call to get the latest committed offset from the server, and will block until the
+     * committed offset is gotten successfully, an unrecoverable error is encountered (in which case it is thrown to
+     * the caller), or the timeout specified by {@code default.api.timeout.ms} expires (in which case a
+     * {@link org.apache.kafka.common.errors.TimeoutException} is thrown to the caller).
      *
      * @param partition The partition to check
      * @return The last committed offset and metadata or null if there was no prior commit
@@ -1653,10 +1669,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
      *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
+     *             the timeout specified by {@code default.api.timeout.ms} expires.
      */
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
-        return committed(partition, Duration.ofMillis(Long.MAX_VALUE));
+        return committed(partition, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1718,11 +1736,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
-     *         the amount of time allocated by {@code request.timeout.ms} expires.
+     *         the amount of time allocated by {@code default.api.timeout.ms} expires.
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
-        return partitionsFor(topic, Duration.ofMillis(requestTimeoutMs));
+        return partitionsFor(topic, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1774,11 +1792,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             this function is called
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
-     *         the amount of time allocated by {@code request.timeout.ms} expires.
+     *         the amount of time allocated by {@code default.api.timeout.ms} expires.
      */
     @Override
     public Map<String, List<PartitionInfo>> listTopics() {
-        return listTopics(Duration.ofMillis(requestTimeoutMs));
+        return listTopics(Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1879,13 +1897,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
      * @throws IllegalArgumentException if the target timestamp is negative
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
-     *         the amount of time allocated by {@code request.timeout.ms} expires.
+     *         the amount of time allocated by {@code default.api.timeout.ms} expires.
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up
      *         the offsets by timestamp
      */
     @Override
     public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
-        return offsetsForTimes(timestampsToSearch, Duration.ofMillis(requestTimeoutMs));
+        return offsetsForTimes(timestampsToSearch, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
@@ -1939,11 +1957,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
-     *         expiration of the configured {@code request.timeout.ms}
+     *         expiration of the configured {@code default.api.timeout.ms}
      */
     @Override
     public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
-        return beginningOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+        return beginningOffsets(partitions, Duration.ofMillis(defaultApiTimeoutMs));
     }
 
     /**
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 4be6884..b8681e8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1748,6 +1748,7 @@ public class KafkaConsumerTest {
         String metricGroupPrefix = "consumer";
         long retryBackoffMs = 100;
         long requestTimeoutMs = 30000;
+        int defaultApiTimeoutMs = 30000;
         boolean excludeInternalTopics = true;
         int minBytes = 1;
         int maxBytes = Integer.MAX_VALUE;
@@ -1825,6 +1826,7 @@ public class KafkaConsumerTest {
                 metadata,
                 retryBackoffMs,
                 requestTimeoutMs,
+                defaultApiTimeoutMs,
                 assignors);
     }
 
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 3c75bad..7061d6c 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -98,10 +98,13 @@
         <code>internal.value.converter=org.apache.kafka.connect.json.JsonConverter</code>
         <code>internal.value.converter.schemas.enable=false</code>
     </li>
-    <li><a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> adds overloads to the consumer to support
-        timeout behavior for blocking APIs. In particular, a new <code>poll(Duration)</code> API has been added which
-        does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and
-        will be removed in a future version.</li>
+    <li><a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> adds a new consumer configuration <code>default.api.timeout.ms</code>
+        to specify the default timeout to use for <code>KafkaConsumer</code> APIs that could block. The KIP also adds overloads for such blocking
+        APIs to support specifying a specific timeout to use for each of them instead of using the default timeout set by <code>default.api.timeout.ms</code>.
+        In particular, a new <code>poll(Duration)</code> API has been added which does not block for dynamic partition assignment.
+        The old <code>poll(long)</code> API has been deprecated and will be removed in a future version. Overloads have also been added
+        for other <code>KafkaConsumer</code> methods like <code>partitionsFor</code>, <code>listTopics</code>, <code>offsetsForTimes</code>,
+        <code>beginningOffsets</code>, <code>endOffsets</code> and <code>close</code> that take in a <code>Duration</code>.</li>
     <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
     <li>The tool kafka.tools.ReplayLogProducer has been removed.</li>
     <li>The AclCommand tool <code>--producer</code> convenience option uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a> finer grained ACL on the given topic. </li>

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.