You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/30 15:30:02 UTC

[kafka] branch trunk updated: KAFKA-6608; Add timeout parameter to blocking consumer calls [KIP-266] (#5014)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f24a62d  KAFKA-6608; Add timeout parameter to blocking consumer calls [KIP-266] (#5014)
f24a62d is described below

commit f24a62d4acfc48e23dd4bbb854668a727e9445a8
Author: ConcurrencyPractitioner <yo...@gmail.com>
AuthorDate: Wed May 30 08:29:54 2018 -0700

    KAFKA-6608; Add timeout parameter to blocking consumer calls [KIP-266] (#5014)
    
    This patch implements the consumer timeout APIs from KIP-266 (everything except `poll()`, which was done separately).
    
    Reviewers:  John Roesler <jo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../apache/kafka/clients/consumer/Consumer.java    |  51 +++-
 .../kafka/clients/consumer/KafkaConsumer.java      | 305 +++++++++++++++++++--
 .../kafka/clients/consumer/MockConsumer.java       |  46 ++++
 3 files changed, 370 insertions(+), 32 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index acb53e1..2e8ad2c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -98,6 +98,10 @@ public interface Consumer<K, V> extends Closeable {
     void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
 
     /**
+     * @see KafkaConsumer#commitSync(Map, Duration)
+     */
+    void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
+    /**
      * @see KafkaConsumer#commitAsync()
      */
     void commitAsync();
@@ -131,6 +135,11 @@ public interface Consumer<K, V> extends Closeable {
      * @see KafkaConsumer#position(TopicPartition)
      */
     long position(TopicPartition partition);
+    
+    /**
+     * @see KafkaConsumer#position(TopicPartition, Duration)
+     */
+    long position(TopicPartition partition, final Duration timeout);
 
     /**
      * @see KafkaConsumer#committed(TopicPartition)
@@ -138,6 +147,11 @@ public interface Consumer<K, V> extends Closeable {
     OffsetAndMetadata committed(TopicPartition partition);
 
     /**
+     * @see KafkaConsumer#committed(TopicPartition, Duration)
+     */
+    OffsetAndMetadata committed(TopicPartition partition, final Duration timeout);
+
+    /**
      * @see KafkaConsumer#metrics()
      */
     Map<MetricName, ? extends Metric> metrics();
@@ -148,11 +162,21 @@ public interface Consumer<K, V> extends Closeable {
     List<PartitionInfo> partitionsFor(String topic);
 
     /**
+     * @see KafkaConsumer#partitionsFor(String, Duration)
+     */
+    List<PartitionInfo> partitionsFor(String topic, Duration timeout);
+
+    /**
      * @see KafkaConsumer#listTopics()
      */
     Map<String, List<PartitionInfo>> listTopics();
 
     /**
+     * @see KafkaConsumer#listTopics(Duration)
+     */
+    Map<String, List<PartitionInfo>> listTopics(Duration timeout);
+
+    /**
      * @see KafkaConsumer#paused()
      */
     Set<TopicPartition> paused();
@@ -168,21 +192,36 @@ public interface Consumer<K, V> extends Closeable {
     void resume(Collection<TopicPartition> partitions);
 
     /**
-     * @see KafkaConsumer#offsetsForTimes(java.util.Map)
+     * @see KafkaConsumer#offsetsForTimes(Map)
      */
     Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
 
     /**
-     * @see KafkaConsumer#beginningOffsets(java.util.Collection)
+     * @see KafkaConsumer#offsetsForTimes(Map, Duration)
+     */
+    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout);
+
+    /**
+     * @see KafkaConsumer#beginningOffsets(Collection)
      */
     Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
 
     /**
-     * @see KafkaConsumer#endOffsets(java.util.Collection)
+     * @see KafkaConsumer#beginningOffsets(Collection, Duration)
+     */
+    Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout);
+
+    /**
+     * @see KafkaConsumer#endOffsets(Collection)
      */
     Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
 
     /**
+     * @see KafkaConsumer#endOffsets(Collection, Duration)
+     */
+    Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeoutMs);
+
+    /**
      * @see KafkaConsumer#close()
      */
     void close();
@@ -190,9 +229,15 @@ public interface Consumer<K, V> extends Closeable {
     /**
      * @see KafkaConsumer#close(long, TimeUnit)
      */
+    @Deprecated
     void close(long timeout, TimeUnit unit);
 
     /**
+     * @see KafkaConsumer#close(Duration)
+     */
+    void close(Duration timeout);
+
+    /**
      * @see KafkaConsumer#wakeup()
      */
     void wakeup();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 602c9d7..feaadd1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -1321,16 +1322,55 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
+        commitSync(offsets, Duration.ofMillis(Long.MAX_VALUE));
+    }
+
+    /**
+    * Commit the specified offsets for the specified list of topics and partitions.
+    * <p>
+    * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
+    * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+    * should not be used. The committed offset should be the next message your application will consume,
+    * i.e. lastProcessedMessageOffset + 1.
+    * <p>
+    * This is a synchronous commits and will block until either the commit succeeds, an unrecoverable error is
+    * encountered (in which case it is thrown to the caller), or the timeout expires.
+    * <p>
+    * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
+    * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
+    *
+    * @param offsets A map of offsets by partition with associated metadata
+    * @param timeout The maximum amount of time to await completion of the offset commit
+    * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
+    *             This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
+    *             or if there is an active group with the same groupId which is using group management.
+    * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+    *             function is called
+    * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+    *             this function is called
+    * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+    * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+    *             configured groupId. See the exception for more details
+    * @throws java.lang.IllegalArgumentException if the committed offset is negative
+    * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
+    *             is too large or if the topic does not exist).
+    * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
+     *            of the offset commit
+    */
+    @Override
+    public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
         acquireAndEnsureOpen();
         try {
-            coordinator.commitOffsetsSync(new HashMap<>(offsets), Long.MAX_VALUE);
+            if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), timeout.toMillis())) {
+                throw new TimeoutException("Committing offsets synchronously took too long.");
+            }
         } finally {
             release();
         }
     }
 
     /**
-     * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and partition.
+     * Commit offsets returned on the last {@link #poll(Duration)} for all the subscribed list of topics and partition.
      * Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
      */
     @Override
@@ -1426,6 +1466,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws IllegalArgumentException if {@code partitions} is {@code null}
      * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
      */
+    @Override
     public void seekToBeginning(Collection<TopicPartition> partitions) {
         if (partitions == null)
             throw new IllegalArgumentException("Partitions collection cannot be null");
@@ -1453,6 +1494,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws IllegalArgumentException if {@code partitions} is {@code null}
      * @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
      */
+    @Override
     public void seekToEnd(Collection<TopicPartition> partitions) {
         if (partitions == null)
             throw new IllegalArgumentException("Partitions collection cannot be null");
@@ -1490,20 +1532,59 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             configured groupId. See the exception for more details
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
      */
+    @Override
     public long position(TopicPartition partition) {
+        return position(partition, Duration.ofMillis(Long.MAX_VALUE));
+    }
+
+    /**
+     * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
+     * This method may issue a remote call to the server if there is no current position 
+     * for the given partition.
+     * <p>
+     * This call will block until the position can be determined, an unrecoverable error is
+     * encountered (in which case it is thrown to the caller), or the timeout expires.
+     *
+     * @param partition The partition to get the position for
+     * @param timeout The maximum amount of time to await determination of the current position
+     * @return The current position of the consumer (that is, the offset of the next record to be fetched)
+     * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
+     * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
+     *             the partition
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.TimeoutException if the position cannot be determined before the
+     *             passed timeout expires
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+     *             configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+     */
+    @Override
+    public long position(TopicPartition partition, final Duration timeout) {
+        final long timeoutMs = timeout.toMillis();
         acquireAndEnsureOpen();
         try {
             if (!this.subscriptions.isAssigned(partition))
                 throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
             Long offset = this.subscriptions.position(partition);
-            while (offset == null) {
+            final long startMs = time.milliseconds();
+            long finishMs = startMs;
+
+            while (offset == null && finishMs - startMs < timeoutMs) {
                 // batch update fetch positions for any partitions without a valid position
-                while (!updateFetchPositions(Long.MAX_VALUE)) {
-                    log.warn("Still updating fetch positions");
+                if (!updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs))) {
+                    break;
                 }
-                client.poll(retryBackoffMs);
+                finishMs = time.milliseconds();
+
+                client.poll(remainingTimeAtLeastZero(timeoutMs, finishMs - startMs));
                 offset = this.subscriptions.position(partition);
+                finishMs = time.milliseconds();
             }
+            if (offset == null) throw new TimeoutException("request timed out, position is unable to be acquired.");
             return offset;
         } finally {
             release();
@@ -1529,14 +1610,37 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
+        return committed(partition, Duration.ofMillis(Long.MAX_VALUE));
+    }
+
+    /**
+     * Get the last committed offset for the given partition (whether the commit happened by this process or
+     * another). This offset will be used as the position for the consumer in the event of a failure.
+     * <p>
+     * This call will block to do a remote call to get the latest committed offsets from the server.
+     *
+     * @param partition The partition to check
+     * @param timeout  The maximum amount of time to await the current committed offset
+     * @return The last committed offset and metadata or null if there was no prior commit
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+     *             configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
+     *             expiration of the timeout
+     */
+    @Override
+    public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
         acquireAndEnsureOpen();
         try {
-            Map<TopicPartition, OffsetAndMetadata> offsets = null;
-            while (offsets == null) {
-                offsets = coordinator.fetchCommittedOffsets(
-                    Collections.singleton(partition),
-                    Long.MAX_VALUE
-                );
+            Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(
+                    Collections.singleton(partition), timeout.toMillis());
+            if (offsets == null) {
+                throw new TimeoutException("Unable to find committed offsets for partition within set duration.");
             }
             return offsets.get(partition);
         } finally {
@@ -1565,13 +1669,38 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             this function is called
      * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See the exception for more details
-     * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
-     *             expiration of the configured request timeout
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+     *         the amount of time allocated by {@code request.timeout.ms} expires.
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
+        return partitionsFor(topic, Duration.ofMillis(requestTimeoutMs));
+    }
+
+    /**
+     * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
+     * does not already have any metadata about the given topic.
+     *
+     * @param topic The topic to get partition metadata for
+     * @param timeout The maximum of time to await topic metadata
+     *
+     * @return The list of partitions
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See
+     *             the exception for more details
+     * @throws org.apache.kafka.common.errors.TimeoutException if topic metadata cannot be fetched before expiration
+     *             of the passed timeout
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+     */
+    @Override
+    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
         acquireAndEnsureOpen();
+        long timeoutMs = timeout.toMillis();
         try {
             Cluster cluster = this.metadata.fetch();
             List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
@@ -1579,7 +1708,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 return parts;
 
             Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
-                    new MetadataRequest.Builder(Collections.singletonList(topic), true), requestTimeoutMs);
+                    new MetadataRequest.Builder(Collections.singletonList(topic), true), timeoutMs);
             return topicMetadata.get(topic);
         } finally {
             release();
@@ -1596,15 +1725,35 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             function is called
      * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
      *             this function is called
-     * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
-     *             expiration of the configured request timeout
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+     *         the amount of time allocated by {@code request.timeout.ms} expires.
      */
     @Override
     public Map<String, List<PartitionInfo>> listTopics() {
+        return listTopics(Duration.ofMillis(requestTimeoutMs));
+    }
+
+    /**
+     * Get metadata about partitions for all topics that the user is authorized to view. This method will issue a
+     * remote call to the server.
+     *
+     * @param timeout The maximum time this operation will block to fetch topic metadata
+     *
+     * @return The map of topics and its partitions
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
+     *             expiration of the passed timeout
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+     */
+    @Override
+    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
         acquireAndEnsureOpen();
         try {
-            return fetcher.getAllTopicMetadata(requestTimeoutMs);
+            return fetcher.getAllTopicMetadata(timeout.toMillis());
         } finally {
             release();
         }
@@ -1683,12 +1832,39 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
      * @throws IllegalArgumentException if the target timestamp is negative
      * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
-     *         expiration of the configured {@code request.timeout.ms}
+     *         the amount of time allocated by {@code request.timeout.ms} expires.
      * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up
      *         the offsets by timestamp
      */
     @Override
     public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
+        return offsetsForTimes(timestampsToSearch, Duration.ofMillis(requestTimeoutMs));
+    }
+
+    /**
+     * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
+     * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
+     *
+     * This is a blocking call. The consumer does not have to be assigned the partitions.
+     * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
+     * will be returned for that partition.
+     *
+     * @param timestampsToSearch the mapping from partition to the timestamp to look up.
+     * @param timeout The maximum amount of time to await retrieval of the offsets
+     *
+     * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
+     *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
+     *         such message.
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
+     * @throws IllegalArgumentException if the target timestamp is negative
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+     *         expiration of the passed timeout
+     * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up
+     *         the offsets by timestamp
+     */
+    @Override
+    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
         acquireAndEnsureOpen();
         try {
             for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
@@ -1698,7 +1874,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
                             entry.getValue() + ". The target time cannot be negative.");
             }
-            return fetcher.offsetsByTimes(timestampsToSearch, requestTimeoutMs);
+            return fetcher.offsetsByTimes(timestampsToSearch, timeout.toMillis());
         } finally {
             release();
         }
@@ -1715,14 +1891,35 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The earliest available offsets for the given partitions
      * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
-     * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
      *         expiration of the configured {@code request.timeout.ms}
      */
     @Override
     public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
+        return beginningOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+    }
+
+    /**
+     * Get the first offset for the given partitions.
+     * <p>
+     * This method does not change the current consumer position of the partitions.
+     *
+     * @see #seekToBeginning(Collection)
+     *
+     * @param partitions the partitions to get the earliest offsets
+     * @param timeout The maximum amount of time to await retrieval of the beginning offsets
+     *
+     * @return The earliest available offsets for the given partitions
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+     *         expiration of the passed timeout
+     */
+    @Override
+    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
         acquireAndEnsureOpen();
         try {
-            return fetcher.beginningOffsets(partitions, requestTimeoutMs);
+            return fetcher.beginningOffsets(partitions, timeout.toMillis());
         } finally {
             release();
         }
@@ -1744,14 +1941,40 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The end offsets for the given partitions.
      * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
      * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
-     * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
-     *         expiration of the configured {@code request.timeout.ms}
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+     *         the amount of time allocated by {@code request.timeout.ms} expires
      */
     @Override
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
+        return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+    }
+
+    /**
+     * Get the end offsets for the given partitions. In the default {@code read_uncommitted} isolation level, the end
+     * offset is the high watermark (that is, the offset of the last successfully replicated message plus one). For
+     * {@code read_committed} consumers, the end offset is the last stable offset (LSO), which is the minimum of
+     * the high watermark and the smallest offset of any open transaction. Finally, if the partition has never been
+     * written to, the end offset is 0.
+     *
+     * <p>
+     * This method does not change the current consumer position of the partitions.
+     *
+     * @see #seekToEnd(Collection)
+     *
+     * @param partitions the partitions to get the end offsets.
+     * @param timeout The maximum amount of time to await retrieval of the end offsets
+     *
+     * @return The end offsets for the given partitions.
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
+     * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
+     *         expiration of the passed timeout
+     */
+    @Override
+    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
         acquireAndEnsureOpen();
         try {
-            return fetcher.endOffsets(partitions, requestTimeoutMs);
+            return fetcher.endOffsets(partitions, timeout.toMillis());
         } finally {
             release();
         }
@@ -1760,7 +1983,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     /**
      * Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
      * If auto-commit is enabled, this will commit the current offsets if possible within the default
-     * timeout. See {@link #close(long, TimeUnit)} for details. Note that {@link #wakeup()}
+     * timeout. See {@link #close(Duration)} for details. Note that {@link #wakeup()}
      * cannot be used to interrupt close.
      *
      * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted
@@ -1769,7 +1992,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void close() {
-        close(DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
     }
 
     /**
@@ -1786,15 +2009,39 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws IllegalArgumentException If the {@code timeout} is negative.
      * @throws InterruptException If the thread is interrupted before or while this function is called
      * @throws org.apache.kafka.common.KafkaException for any other error during close
+     *
+     * @deprecated Since 2.0. Use {@link #close(Duration)} or {@link #close()}.
      */
+    @Deprecated
+    @Override
     public void close(long timeout, TimeUnit timeUnit) {
-        if (timeout < 0)
+        close(Duration.ofMillis(TimeUnit.MILLISECONDS.toMillis(timeout)));
+    }
+
+    /**
+     * Tries to close the consumer cleanly within the specified timeout. This method waits up to
+     * {@code timeout} for the consumer to complete pending commits and leave the group.
+     * If auto-commit is enabled, this will commit the current offsets if possible within the
+     * timeout. If the consumer is unable to complete offset commits and gracefully leave the group
+     * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
+     * used to interrupt close.
+     *
+     * @param timeout The maximum time to wait for consumer to close gracefully. The value must be
+     *                non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
+     *
+     * @throws IllegalArgumentException If the {@code timeout} is negative.
+     * @throws InterruptException If the thread is interrupted before or while this function is called
+     * @throws org.apache.kafka.common.KafkaException for any other error during close
+     */
+    @Override
+    public void close(Duration timeout) {
+        if (timeout.toMillis() < 0)
             throw new IllegalArgumentException("The timeout cannot be negative.");
         acquire();
         try {
             if (!closed) {
                 closed = true;
-                close(timeUnit.toMillis(timeout), false);
+                close(timeout.toMillis(), false);
             }
         } finally {
             release();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 479a9ff..3502156 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -252,6 +252,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
+    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
+        commitSync(offsets);
+    }
+
+    @Override
     public synchronized void seek(TopicPartition partition, long offset) {
         ensureNotClosed();
         subscriptions.seek(partition, offset);
@@ -267,6 +272,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
+    public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
+        return committed(partition);
+    }
+
+    @Override
     public synchronized long position(TopicPartition partition) {
         ensureNotClosed();
         if (!this.subscriptions.isAssigned(partition))
@@ -280,6 +290,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
+    public synchronized long position(TopicPartition partition, final Duration timeout) {
+        return position(partition);
+    }
+
+    @Override
     public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
         ensureNotClosed();
         for (TopicPartition tp : partitions)
@@ -470,4 +485,35 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         }
         return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0);
     }
+
+    @Override
+    public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
+        return partitionsFor(topic);
+    }
+
+    @Override
+    public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
+        return listTopics();
+    }
+
+    @Override
+    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch,
+            Duration timeout) {
+        return offsetsForTimes(timestampsToSearch);
+    }
+
+    @Override
+    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
+        return beginningOffsets(partitions);
+    }
+
+    @Override
+    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration duration) {
+        return endOffsets(partitions);
+    }
+
+    @Override
+    public void close(Duration timeout) {
+        close();
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.