You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/30 15:30:02 UTC
[kafka] branch trunk updated: KAFKA-6608;
Add timeout parameter to blocking consumer calls [KIP-266] (#5014)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f24a62d KAFKA-6608; Add timeout parameter to blocking consumer calls [KIP-266] (#5014)
f24a62d is described below
commit f24a62d4acfc48e23dd4bbb854668a727e9445a8
Author: ConcurrencyPractitioner <yo...@gmail.com>
AuthorDate: Wed May 30 08:29:54 2018 -0700
KAFKA-6608; Add timeout parameter to blocking consumer calls [KIP-266] (#5014)
This patch implements the consumer timeout APIs from KIP-266 (everything except `poll()`, which was done separately).
Reviewers: John Roesler <jo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
.../apache/kafka/clients/consumer/Consumer.java | 51 +++-
.../kafka/clients/consumer/KafkaConsumer.java | 305 +++++++++++++++++++--
.../kafka/clients/consumer/MockConsumer.java | 46 ++++
3 files changed, 370 insertions(+), 32 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index acb53e1..2e8ad2c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -98,6 +98,10 @@ public interface Consumer<K, V> extends Closeable {
void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
/**
+ * @see KafkaConsumer#commitSync(Map, Duration)
+ */
+ void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
+ /**
* @see KafkaConsumer#commitAsync()
*/
void commitAsync();
@@ -131,6 +135,11 @@ public interface Consumer<K, V> extends Closeable {
* @see KafkaConsumer#position(TopicPartition)
*/
long position(TopicPartition partition);
+
+ /**
+ * @see KafkaConsumer#position(TopicPartition, Duration)
+ */
+ long position(TopicPartition partition, final Duration timeout);
/**
* @see KafkaConsumer#committed(TopicPartition)
@@ -138,6 +147,11 @@ public interface Consumer<K, V> extends Closeable {
OffsetAndMetadata committed(TopicPartition partition);
/**
+ * @see KafkaConsumer#committed(TopicPartition, Duration)
+ */
+ OffsetAndMetadata committed(TopicPartition partition, final Duration timeout);
+
+ /**
* @see KafkaConsumer#metrics()
*/
Map<MetricName, ? extends Metric> metrics();
@@ -148,11 +162,21 @@ public interface Consumer<K, V> extends Closeable {
List<PartitionInfo> partitionsFor(String topic);
/**
+ * @see KafkaConsumer#partitionsFor(String, Duration)
+ */
+ List<PartitionInfo> partitionsFor(String topic, Duration timeout);
+
+ /**
* @see KafkaConsumer#listTopics()
*/
Map<String, List<PartitionInfo>> listTopics();
/**
+ * @see KafkaConsumer#listTopics(Duration)
+ */
+ Map<String, List<PartitionInfo>> listTopics(Duration timeout);
+
+ /**
* @see KafkaConsumer#paused()
*/
Set<TopicPartition> paused();
@@ -168,21 +192,36 @@ public interface Consumer<K, V> extends Closeable {
void resume(Collection<TopicPartition> partitions);
/**
- * @see KafkaConsumer#offsetsForTimes(java.util.Map)
+ * @see KafkaConsumer#offsetsForTimes(Map)
*/
Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
/**
- * @see KafkaConsumer#beginningOffsets(java.util.Collection)
+ * @see KafkaConsumer#offsetsForTimes(Map, Duration)
+ */
+ Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout);
+
+ /**
+ * @see KafkaConsumer#beginningOffsets(Collection)
*/
Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions);
/**
- * @see KafkaConsumer#endOffsets(java.util.Collection)
+ * @see KafkaConsumer#beginningOffsets(Collection, Duration)
+ */
+ Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout);
+
+ /**
+ * @see KafkaConsumer#endOffsets(Collection)
*/
Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions);
/**
+ * @see KafkaConsumer#endOffsets(Collection, Duration)
+ */
+ Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeoutMs);
+
+ /**
* @see KafkaConsumer#close()
*/
void close();
@@ -190,9 +229,15 @@ public interface Consumer<K, V> extends Closeable {
/**
* @see KafkaConsumer#close(long, TimeUnit)
*/
+ @Deprecated
void close(long timeout, TimeUnit unit);
/**
+ * @see KafkaConsumer#close(Duration)
+ */
+ void close(Duration timeout);
+
+ /**
* @see KafkaConsumer#wakeup()
*/
void wakeup();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 602c9d7..feaadd1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -1321,16 +1322,55 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
+ commitSync(offsets, Duration.ofMillis(Long.MAX_VALUE));
+ }
+
+ /**
+ * Commit the specified offsets for the specified list of topics and partitions.
+ * <p>
+ * This commits offsets to Kafka. The offsets committed using this API will be used on the first fetch after every
+ * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+ * should not be used. The committed offset should be the next message your application will consume,
+ * i.e. lastProcessedMessageOffset + 1.
+ * <p>
+ * This is a synchronous commits and will block until either the commit succeeds, an unrecoverable error is
+ * encountered (in which case it is thrown to the caller), or the timeout expires.
+ * <p>
+ * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
+ * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
+ *
+ * @param offsets A map of offsets by partition with associated metadata
+ * @param timeout The maximum amount of time to await completion of the offset commit
+ * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
+ * This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
+ * or if there is an active group with the same groupId which is using group management.
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+ * configured groupId. See the exception for more details
+ * @throws java.lang.IllegalArgumentException if the committed offset is negative
+ * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
+ * is too large or if the topic does not exist).
+ * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
+ * of the offset commit
+ */
+ @Override
+ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
acquireAndEnsureOpen();
try {
- coordinator.commitOffsetsSync(new HashMap<>(offsets), Long.MAX_VALUE);
+ if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), timeout.toMillis())) {
+ throw new TimeoutException("Committing offsets synchronously took too long.");
+ }
} finally {
release();
}
}
/**
- * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and partition.
+ * Commit offsets returned on the last {@link #poll(Duration)} for all the subscribed list of topics and partition.
* Same as {@link #commitAsync(OffsetCommitCallback) commitAsync(null)}
*/
@Override
@@ -1426,6 +1466,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws IllegalArgumentException if {@code partitions} is {@code null}
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
+ @Override
public void seekToBeginning(Collection<TopicPartition> partitions) {
if (partitions == null)
throw new IllegalArgumentException("Partitions collection cannot be null");
@@ -1453,6 +1494,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws IllegalArgumentException if {@code partitions} is {@code null}
* @throws IllegalStateException if any of the provided partitions are not currently assigned to this consumer
*/
+ @Override
public void seekToEnd(Collection<TopicPartition> partitions) {
if (partitions == null)
throw new IllegalArgumentException("Partitions collection cannot be null");
@@ -1490,20 +1532,59 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* configured groupId. See the exception for more details
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
*/
+ @Override
public long position(TopicPartition partition) {
+ return position(partition, Duration.ofMillis(Long.MAX_VALUE));
+ }
+
+ /**
+ * Get the offset of the <i>next record</i> that will be fetched (if a record with that offset exists).
+ * This method may issue a remote call to the server if there is no current position
+ * for the given partition.
+ * <p>
+ * This call will block until the position can be determined, an unrecoverable error is
+ * encountered (in which case it is thrown to the caller), or the timeout expires.
+ *
+ * @param partition The partition to get the position for
+ * @param timeout The maximum amount of time to await determination of the current position
+ * @return The current position of the consumer (that is, the offset of the next record to be fetched)
+ * @throws IllegalArgumentException if the provided TopicPartition is not assigned to this consumer
+ * @throws org.apache.kafka.clients.consumer.InvalidOffsetException if no offset is currently defined for
+ * the partition
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.TimeoutException if the position cannot be determined before the
+ * passed timeout expires
+ * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+ * configured groupId. See the exception for more details
+ * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+ */
+ @Override
+ public long position(TopicPartition partition, final Duration timeout) {
+ final long timeoutMs = timeout.toMillis();
acquireAndEnsureOpen();
try {
if (!this.subscriptions.isAssigned(partition))
throw new IllegalStateException("You can only check the position for partitions assigned to this consumer.");
Long offset = this.subscriptions.position(partition);
- while (offset == null) {
+ final long startMs = time.milliseconds();
+ long finishMs = startMs;
+
+ while (offset == null && finishMs - startMs < timeoutMs) {
// batch update fetch positions for any partitions without a valid position
- while (!updateFetchPositions(Long.MAX_VALUE)) {
- log.warn("Still updating fetch positions");
+ if (!updateFetchPositions(remainingTimeAtLeastZero(timeoutMs, time.milliseconds() - startMs))) {
+ break;
}
- client.poll(retryBackoffMs);
+ finishMs = time.milliseconds();
+
+ client.poll(remainingTimeAtLeastZero(timeoutMs, finishMs - startMs));
offset = this.subscriptions.position(partition);
+ finishMs = time.milliseconds();
}
+ if (offset == null) throw new TimeoutException("request timed out, position is unable to be acquired.");
return offset;
} finally {
release();
@@ -1529,14 +1610,37 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
+ return committed(partition, Duration.ofMillis(Long.MAX_VALUE));
+ }
+
+ /**
+ * Get the last committed offset for the given partition (whether the commit happened by this process or
+ * another). This offset will be used as the position for the consumer in the event of a failure.
+ * <p>
+ * This call will block to do a remote call to get the latest committed offsets from the server.
+ *
+ * @param partition The partition to check
+ * @param timeout The maximum amount of time to await the current committed offset
+ * @return The last committed offset and metadata or null if there was no prior commit
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+ * configured groupId. See the exception for more details
+ * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+ * @throws org.apache.kafka.common.errors.TimeoutException if the committed offset cannot be found before
+ * expiration of the timeout
+ */
+ @Override
+ public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
acquireAndEnsureOpen();
try {
- Map<TopicPartition, OffsetAndMetadata> offsets = null;
- while (offsets == null) {
- offsets = coordinator.fetchCommittedOffsets(
- Collections.singleton(partition),
- Long.MAX_VALUE
- );
+ Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(
+ Collections.singleton(partition), timeout.toMillis());
+ if (offsets == null) {
+ throw new TimeoutException("Unable to find committed offsets for partition within set duration.");
}
return offsets.get(partition);
} finally {
@@ -1565,13 +1669,38 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* this function is called
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See the exception for more details
- * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
- * expiration of the configured request timeout
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+ * the amount of time allocated by {@code request.timeout.ms} expires.
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
+ return partitionsFor(topic, Duration.ofMillis(requestTimeoutMs));
+ }
+
+ /**
+ * Get metadata about the partitions for a given topic. This method will issue a remote call to the server if it
+ * does not already have any metadata about the given topic.
+ *
+ * @param topic The topic to get partition metadata for
+ * @param timeout The maximum of time to await topic metadata
+ *
+ * @return The list of partitions
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the specified topic. See
+ * the exception for more details
+ * @throws org.apache.kafka.common.errors.TimeoutException if topic metadata cannot be fetched before expiration
+ * of the passed timeout
+ * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+ */
+ @Override
+ public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
acquireAndEnsureOpen();
+ long timeoutMs = timeout.toMillis();
try {
Cluster cluster = this.metadata.fetch();
List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
@@ -1579,7 +1708,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
return parts;
Map<String, List<PartitionInfo>> topicMetadata = fetcher.getTopicMetadata(
- new MetadataRequest.Builder(Collections.singletonList(topic), true), requestTimeoutMs);
+ new MetadataRequest.Builder(Collections.singletonList(topic), true), timeoutMs);
return topicMetadata.get(topic);
} finally {
release();
@@ -1596,15 +1725,35 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* function is called
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called
- * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
- * expiration of the configured request timeout
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+ * the amount of time allocated by {@code request.timeout.ms} expires.
*/
@Override
public Map<String, List<PartitionInfo>> listTopics() {
+ return listTopics(Duration.ofMillis(requestTimeoutMs));
+ }
+
+ /**
+ * Get metadata about partitions for all topics that the user is authorized to view. This method will issue a
+ * remote call to the server.
+ *
+ * @param timeout The maximum time this operation will block to fetch topic metadata
+ *
+ * @return The map of topics and its partitions
+ * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+ * function is called
+ * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+ * this function is called
+ * @throws org.apache.kafka.common.errors.TimeoutException if the topic metadata could not be fetched before
+ * expiration of the passed timeout
+ * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
+ */
+ @Override
+ public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
acquireAndEnsureOpen();
try {
- return fetcher.getAllTopicMetadata(requestTimeoutMs);
+ return fetcher.getAllTopicMetadata(timeout.toMillis());
} finally {
release();
}
@@ -1683,12 +1832,39 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
* @throws IllegalArgumentException if the target timestamp is negative
* @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
- * expiration of the configured {@code request.timeout.ms}
+ * the amount of time allocated by {@code request.timeout.ms} expires.
* @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up
* the offsets by timestamp
*/
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
+ return offsetsForTimes(timestampsToSearch, Duration.ofMillis(requestTimeoutMs));
+ }
+
+ /**
+ * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
+ * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
+ *
+ * This is a blocking call. The consumer does not have to be assigned the partitions.
+ * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
+ * will be returned for that partition.
+ *
+ * @param timestampsToSearch the mapping from partition to the timestamp to look up.
+ * @param timeout The maximum amount of time to await retrieval of the offsets
+ *
+ * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
+ * than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
+ * such message.
+ * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
+ * @throws IllegalArgumentException if the target timestamp is negative
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+ * expiration of the passed timeout
+ * @throws org.apache.kafka.common.errors.UnsupportedVersionException if the broker does not support looking up
+ * the offsets by timestamp
+ */
+ @Override
+ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {
acquireAndEnsureOpen();
try {
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
@@ -1698,7 +1874,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
entry.getValue() + ". The target time cannot be negative.");
}
- return fetcher.offsetsByTimes(timestampsToSearch, requestTimeoutMs);
+ return fetcher.offsetsByTimes(timestampsToSearch, timeout.toMillis());
} finally {
release();
}
@@ -1715,14 +1891,35 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The earliest available offsets for the given partitions
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
- * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
* expiration of the configured {@code request.timeout.ms}
*/
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
+ return beginningOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+ }
+
+ /**
+ * Get the first offset for the given partitions.
+ * <p>
+ * This method does not change the current consumer position of the partitions.
+ *
+ * @see #seekToBeginning(Collection)
+ *
+ * @param partitions the partitions to get the earliest offsets
+ * @param timeout The maximum amount of time to await retrieval of the beginning offsets
+ *
+ * @return The earliest available offsets for the given partitions
+ * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+ * expiration of the passed timeout
+ */
+ @Override
+ public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
acquireAndEnsureOpen();
try {
- return fetcher.beginningOffsets(partitions, requestTimeoutMs);
+ return fetcher.beginningOffsets(partitions, timeout.toMillis());
} finally {
release();
}
@@ -1744,14 +1941,40 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The end offsets for the given partitions.
* @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
* @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
- * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
- * expiration of the configured {@code request.timeout.ms}
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offset metadata could not be fetched before
+ * the amount of time allocated by {@code request.timeout.ms} expires
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
+ return endOffsets(partitions, Duration.ofMillis(requestTimeoutMs));
+ }
+
+ /**
+ * Get the end offsets for the given partitions. In the default {@code read_uncommitted} isolation level, the end
+ * offset is the high watermark (that is, the offset of the last successfully replicated message plus one). For
+ * {@code read_committed} consumers, the end offset is the last stable offset (LSO), which is the minimum of
+ * the high watermark and the smallest offset of any open transaction. Finally, if the partition has never been
+ * written to, the end offset is 0.
+ *
+ * <p>
+ * This method does not change the current consumer position of the partitions.
+ *
+ * @see #seekToEnd(Collection)
+ *
+ * @param partitions the partitions to get the end offsets.
+ * @param timeout The maximum amount of time to await retrieval of the end offsets
+ *
+ * @return The end offsets for the given partitions.
+ * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+ * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic(s). See the exception for more details
+ * @throws org.apache.kafka.common.errors.TimeoutException if the offsets could not be fetched before
+ * expiration of the passed timeout
+ */
+ @Override
+ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
acquireAndEnsureOpen();
try {
- return fetcher.endOffsets(partitions, requestTimeoutMs);
+ return fetcher.endOffsets(partitions, timeout.toMillis());
} finally {
release();
}
@@ -1760,7 +1983,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
/**
* Close the consumer, waiting for up to the default timeout of 30 seconds for any needed cleanup.
* If auto-commit is enabled, this will commit the current offsets if possible within the default
- * timeout. See {@link #close(long, TimeUnit)} for details. Note that {@link #wakeup()}
+ * timeout. See {@link #close(Duration)} for details. Note that {@link #wakeup()}
* cannot be used to interrupt close.
*
* @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted
@@ -1769,7 +1992,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void close() {
- close(DEFAULT_CLOSE_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ close(Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS));
}
/**
@@ -1786,15 +2009,39 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws IllegalArgumentException If the {@code timeout} is negative.
* @throws InterruptException If the thread is interrupted before or while this function is called
* @throws org.apache.kafka.common.KafkaException for any other error during close
+ *
+ * @deprecated Since 2.0. Use {@link #close(Duration)} or {@link #close()}.
*/
+ @Deprecated
+ @Override
public void close(long timeout, TimeUnit timeUnit) {
- if (timeout < 0)
+ close(Duration.ofMillis(TimeUnit.MILLISECONDS.toMillis(timeout)));
+ }
+
+ /**
+ * Tries to close the consumer cleanly within the specified timeout. This method waits up to
+ * {@code timeout} for the consumer to complete pending commits and leave the group.
+ * If auto-commit is enabled, this will commit the current offsets if possible within the
+ * timeout. If the consumer is unable to complete offset commits and gracefully leave the group
+ * before the timeout expires, the consumer is force closed. Note that {@link #wakeup()} cannot be
+ * used to interrupt close.
+ *
+ * @param timeout The maximum time to wait for consumer to close gracefully. The value must be
+ * non-negative. Specifying a timeout of zero means do not wait for pending requests to complete.
+ *
+ * @throws IllegalArgumentException If the {@code timeout} is negative.
+ * @throws InterruptException If the thread is interrupted before or while this function is called
+ * @throws org.apache.kafka.common.KafkaException for any other error during close
+ */
+ @Override
+ public void close(Duration timeout) {
+ if (timeout.toMillis() < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
acquire();
try {
if (!closed) {
closed = true;
- close(timeUnit.toMillis(timeout), false);
+ close(timeout.toMillis(), false);
}
} finally {
release();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 479a9ff..3502156 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -252,6 +252,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
@Override
+ public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
+ commitSync(offsets);
+ }
+
+ @Override
public synchronized void seek(TopicPartition partition, long offset) {
ensureNotClosed();
subscriptions.seek(partition, offset);
@@ -267,6 +272,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
@Override
+ public OffsetAndMetadata committed(TopicPartition partition, final Duration timeout) {
+ return committed(partition);
+ }
+
+ @Override
public synchronized long position(TopicPartition partition) {
ensureNotClosed();
if (!this.subscriptions.isAssigned(partition))
@@ -280,6 +290,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
@Override
+ public synchronized long position(TopicPartition partition, final Duration timeout) {
+ return position(partition);
+ }
+
+ @Override
public synchronized void seekToBeginning(Collection<TopicPartition> partitions) {
ensureNotClosed();
for (TopicPartition tp : partitions)
@@ -470,4 +485,35 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
}
return offsets.size() > 1 ? offsets.remove(0) : offsets.get(0);
}
+
+ @Override
+ public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
+ return partitionsFor(topic);
+ }
+
+ @Override
+ public Map<String, List<PartitionInfo>> listTopics(Duration timeout) {
+ return listTopics();
+ }
+
+ @Override
+ public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch,
+ Duration timeout) {
+ return offsetsForTimes(timestampsToSearch);
+ }
+
+ @Override
+ public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {
+ return beginningOffsets(partitions);
+ }
+
+ @Override
+ public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration duration) {
+ return endOffsets(partitions);
+ }
+
+ @Override
+ public void close(Duration timeout) {
+ close();
+ }
}
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.