You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/05/31 00:28:27 UTC

[kafka] branch trunk updated: MINOR: Follow-up improvements for KIP-266 (#5084)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3683d47  MINOR: Follow-up improvements for KIP-266 (#5084)
3683d47 is described below

commit 3683d475edb7dbb4990ba6da7861552a602537bd
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed May 30 17:28:17 2018 -0700

    MINOR: Follow-up improvements for KIP-266 (#5084)
    
    This patch contains a few follow-up improvements/cleanup for KIP-266:
    
    - Add upgrade notes
    - Add missing `commitSync(Duration)` API
    - Improve timeout messages and fix some naming inconsistencies
    - Various small cleanups
    
    Reviewers: John Roesler <jo...@confluent.io>, Guozhang Wang <wa...@gmail.com>
---
 .../apache/kafka/clients/consumer/Consumer.java    |  7 ++-
 .../kafka/clients/consumer/KafkaConsumer.java      | 59 +++++++++++++++++++---
 .../kafka/clients/consumer/MockConsumer.java       |  7 ++-
 .../consumer/internals/ConsumerCoordinator.java    | 41 ++++++++-------
 docs/upgrade.html                                  |  6 ++-
 5 files changed, 93 insertions(+), 27 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
index 2e8ad2c..07a6fb4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java
@@ -93,6 +93,11 @@ public interface Consumer<K, V> extends Closeable {
     void commitSync();
 
     /**
+     * @see KafkaConsumer#commitSync(Duration)
+     */
+    void commitSync(Duration timeout);
+
+    /**
      * @see KafkaConsumer#commitSync(Map)
      */
     void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);
@@ -219,7 +224,7 @@ public interface Consumer<K, V> extends Closeable {
     /**
      * @see KafkaConsumer#endOffsets(Collection, Duration)
      */
-    Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeoutMs);
+    Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout);
 
     /**
      * @see KafkaConsumer#close()
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index feaadd1..5bd6b93 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1103,7 +1103,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             partitions to consume from
      *
      *
-     * @deprecated Since 2.0. Use {@link #poll(Duration)} to poll for records.
+     * @deprecated Since 2.0. Use {@link #poll(Duration)}, which does not block beyond the timeout awaiting partition
+     *             assignment. See <a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> for more information.
      */
     @Deprecated
     @Override
@@ -1119,6 +1120,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
      * offset for the subscribed list of partitions
      *
+     * <p>
+     * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
+     * If the timeout expires, an empty record set will be returned. Note that this method may block beyond the
+     * timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks.
+     *
      *
      * @param timeout The maximum time to block (must not be greater than {@link Long#MAX_VALUE} milliseconds)
      *
@@ -1283,9 +1289,46 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitSync() {
+        commitSync(Duration.ofMillis(Long.MAX_VALUE));
+    }
+
+    /**
+     * Commit offsets returned on the last {@link #poll(Duration) poll()} for all the subscribed list of topics and
+     * partitions.
+     * <p>
+     * This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after
+     * every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
+     * should not be used.
+     * <p>
+     * This is a synchronous commits and will block until either the commit succeeds, an unrecoverable error is
+     * encountered (in which case it is thrown to the caller), or the passed timeout expires.
+     * <p>
+     * Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)}
+     * (or similar) are guaranteed to have their callbacks invoked prior to completion of this method.
+     *
+     * @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
+     *             This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
+     *             or if there is an active group with the same groupId which is using group management.
+     * @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
+     *             function is called
+     * @throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
+     *             this function is called
+     * @throws org.apache.kafka.common.errors.AuthenticationException if authentication fails. See the exception for more details
+     * @throws org.apache.kafka.common.errors.AuthorizationException if not authorized to the topic or to the
+     *             configured groupId. See the exception for more details
+     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
+     *             is too large or if the topic does not exist).
+     * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
+     *            of the offset commit
+     */
+    @Override
+    public void commitSync(Duration timeout) {
         acquireAndEnsureOpen();
         try {
-            coordinator.commitOffsetsSync(subscriptions.allConsumed(), Long.MAX_VALUE);
+            if (!coordinator.commitOffsetsSync(subscriptions.allConsumed(), timeout.toMillis())) {
+                throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
+                        "committing the current consumed offsets");
+            }
         } finally {
             release();
         }
@@ -1355,14 +1398,15 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. if offset metadata
     *             is too large or if the topic does not exist).
     * @throws org.apache.kafka.common.errors.TimeoutException if the timeout expires before successful completion
-     *            of the offset commit
+    *            of the offset commit
     */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
         acquireAndEnsureOpen();
         try {
             if (!coordinator.commitOffsetsSync(new HashMap<>(offsets), timeout.toMillis())) {
-                throw new TimeoutException("Committing offsets synchronously took too long.");
+                throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before successfully " +
+                        "committing offsets " + offsets);
             }
         } finally {
             release();
@@ -1584,7 +1628,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 offset = this.subscriptions.position(partition);
                 finishMs = time.milliseconds();
             }
-            if (offset == null) throw new TimeoutException("request timed out, position is unable to be acquired.");
+            if (offset == null)
+                throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the position " +
+                        "for partition " + partition + " could be determined");
             return offset;
         } finally {
             release();
@@ -1640,7 +1686,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(
                     Collections.singleton(partition), timeout.toMillis());
             if (offsets == null) {
-                throw new TimeoutException("Unable to find committed offsets for partition within set duration.");
+                throw new TimeoutException("Timeout of " + timeout.toMillis() + "ms expired before the last " +
+                        "committed offset for partition " + partition + " could be determined");
             }
             return offsets.get(partition);
         } finally {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 3502156..cf1b07f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -252,6 +252,11 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
+    public synchronized void commitSync(Duration timeout) {
+        commitSync(this.subscriptions.allConsumed());
+    }
+
+    @Override
     public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout) {
         commitSync(offsets);
     }
@@ -508,7 +513,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
     }
 
     @Override
-    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration duration) {
+    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {
         return endOffsets(partitions);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 93d4081..9c19af1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -57,6 +57,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -87,29 +88,25 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private MetadataSnapshot assignmentSnapshot;
     private long nextAutoCommitDeadline;
 
-    // hold onto request&future for commited offset requests to enable async calls.
+    // hold onto request&future for committed offset requests to enable async calls.
     private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
 
     private static class PendingCommittedOffsetRequest {
-        private final Set<TopicPartition> request;
-        private final Generation generation;
+        private final Set<TopicPartition> requestedPartitions;
+        private final Generation requestedGeneration;
         private final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> response;
 
-        private PendingCommittedOffsetRequest(final Set<TopicPartition> request,
+        private PendingCommittedOffsetRequest(final Set<TopicPartition> requestedPartitions,
                                               final Generation generationAtRequestTime,
-                                              final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> response
-        ) {
-            if (request == null) throw new NullPointerException();
-            if (response == null) throw new NullPointerException();
-
-            this.request = request;
-            this.generation = generationAtRequestTime;
-            this.response = response;
+                                              final RequestFuture<Map<TopicPartition, OffsetAndMetadata>> response) {
+            this.requestedPartitions = Objects.requireNonNull(requestedPartitions);
+            this.response = Objects.requireNonNull(response);
+            this.requestedGeneration = generationAtRequestTime;
         }
 
         private boolean sameRequest(final Set<TopicPartition> currentRequest, final Generation currentGeneration) {
-            return (generation == null ? currentGeneration == null : generation.equals(currentGeneration))
-                && request.equals(currentRequest);
+            return (requestedGeneration == null ? currentGeneration == null : requestedGeneration.equals(currentGeneration))
+                && requestedPartitions.equals(currentRequest);
         }
     }
 
@@ -303,6 +300,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
      */
     public boolean poll(final long timeoutMs) {
         final long startTime = time.milliseconds();
+        long currentTime = startTime;
         long elapsed = 0L;
 
         invokeCompletedOffsetCommitCallbacks();
@@ -312,7 +310,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 if (!ensureCoordinatorReady(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                     return false;
                 }
-                elapsed = time.milliseconds() - startTime;
+                currentTime = time.milliseconds();
+                elapsed = currentTime - startTime;
+
             }
 
             if (rejoinNeededOrPending()) {
@@ -323,15 +323,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                     if (!client.ensureFreshMetadata(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                         return false;
                     }
-                    elapsed = time.milliseconds() - startTime;
+                    currentTime = time.milliseconds();
+                    elapsed = currentTime - startTime;
                 }
 
                 if (!ensureActiveGroup(remainingTimeAtLeastZero(timeoutMs, elapsed))) {
                     return false;
                 }
+
+                currentTime = time.milliseconds();
             }
 
-            pollHeartbeat(startTime);
+            pollHeartbeat(currentTime);
         } else {
             // For manually assigned partitions, if there are no ready nodes, await metadata.
             // If connections to all nodes fail, wakeups triggered while attempting to send fetch
@@ -345,10 +348,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 if (!metadataUpdated && !client.hasReadyNodes(time.milliseconds())) {
                     return false;
                 }
+
+                currentTime = time.milliseconds();
             }
         }
 
-        maybeAutoCommitOffsetsAsync(startTime);
+        maybeAutoCommitOffsetsAsync(currentTime);
         return true;
     }
 
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 0954a25..ba2d930 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -60,7 +60,7 @@
     <li>Bumping the protocol version and restarting can be done any time after the brokers are upgraded. It does not have to be immediately after.
         Similarly for the message format version.</li>
     <li>If you are using Java8 method references in your Kafka Streams code you might need to update your code to resolve method ambiguties.
-        Hot-swaping the jar-file only might not work.</li>
+        Hot-swapping the jar-file only might not work.</li>
 </ol>
 
 <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2.0.0</a></h5>
@@ -91,6 +91,10 @@
         <code>internal.value.converter=org.apache.kafka.connect.json.JsonConverter</code>
         <code>internal.value.converter.schemas.enable=false</code>
     </li>
+    <li><a href="https://cwiki.apache.org/confluence/x/5kiHB">KIP-266</a> adds overloads to the consumer to support
+        timeout behavior for blocking APIs. In particular, a new <code>poll(Duration)</code> API has been added which
+        does not block for dynamic partition assignment. The old <code>poll(long)</code> API has been deprecated and
+        will be removed in a future version.</li>
 </ul>
 
 <h5><a id="upgrade_200_new_protocols" href="#upgrade_200_new_protocols">New Protocol Versions</a></h5>

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.