You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/25 07:39:00 UTC

[jira] [Commented] (KAFKA-3824) Docs indicate auto.commit breaks at least once delivery but that is incorrect

    [ https://issues.apache.org/jira/browse/KAFKA-3824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16375977#comment-16375977 ] 

ASF GitHub Bot commented on KAFKA-3824:
---------------------------------------

hachikuji closed pull request #1502: KAFKA-3824: Clarify the at least once delivery with auto commit enabled.
URL: https://github.com/apache/kafka/pull/1502
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 04b41ba1d13..1c249cc34b7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -983,17 +983,18 @@ public void assign(Collection<TopicPartition> partitions) {
 
         long now = time.milliseconds();
 
-        // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
+        // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records.
+        // It is crucial for "at least once" delivery semantics to ensure that offset commits can
+        // only be sent prior to updating the position in Fetcher's fetchedRecords method.
         client.executeDelayedTasks(now);
 
-        // init any new fetches (won't resend pending fetches)
-        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
-
         // if data is available already, e.g. from a previous network client poll() call to commit,
-        // then just return it immediately
+        // then just return it immediately to avoid blocking in poll.
+        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
         if (!records.isEmpty())
             return records;
 
+        // send new fetches to any brokers which don't already have a request in flight.
         fetcher.sendFetches();
         client.poll(timeout, now);
         return fetcher.fetchedRecords();
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index ec351153f05..77f5e1f385c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -31,12 +31,13 @@
  * or with {@link #assignFromSubscribed(Collection)} (automatic assignment from subscription).
  *
  * Once assigned, the partition is not considered "fetchable" until its initial position has
- * been set with {@link #seek(TopicPartition, long)}. Fetchable partitions track a fetch
- * position which is used to set the offset of the next fetch, and a consumed position
- * which is the last offset that has been returned to the user. You can suspend fetching
- * from a partition through {@link #pause(TopicPartition)} without affecting the fetched/consumed
- * offsets. The partition will remain unfetchable until the {@link #resume(TopicPartition)} is
- * used. You can also query the pause state independently with {@link #isPaused(TopicPartition)}.
+ * been set with {@link #seek(TopicPartition, long)}. Each fetchable partition tracks a position
+ * which is used to set the offset of the next fetch. The position used by the next fetch is
+ * one larger than the highest offset the consumer has seen in that partition. You can suspend
+ * fetching from a partition through {@link #pause(TopicPartition)} without affecting the
+ * fetched/consumed offsets. The partition will remain unfetchable until the
+ * {@link #resume(TopicPartition)} is used. You can also query the pause
+ * state independently with {@link #isPaused(TopicPartition)}.
  *
  * Note that pause state as well as fetch/consumed positions are not preserved when partition
  * assignment is changed whether directly by the user or through a group rebalance.


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Docs indicate auto.commit breaks at least once delivery but that is incorrect
> -----------------------------------------------------------------------------
>
>                 Key: KAFKA-3824
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3824
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>    Affects Versions: 0.10.0.0
>            Reporter: Jay Kreps
>            Assignee: Jason Gustafson
>            Priority: Major
>              Labels: newbie
>             Fix For: 0.10.1.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The javadocs for the new consumer indicate that auto commit breaks at least once delivery. This is no longer correct as of 0.10. 
> http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)