You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/03 20:01:16 UTC

kafka git commit: KAFKA-2942: inadvertent auto-commit when pre-fetching can cause message loss

Repository: kafka
Updated Branches:
  refs/heads/trunk 5b5f6bbe6 -> 13e483ade


KAFKA-2942: inadvertent auto-commit when pre-fetching can cause message loss

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang

Closes #623 from hachikuji/KAFKA-2942


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13e483ad
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13e483ad
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13e483ad

Branch: refs/heads/trunk
Commit: 13e483adeee8d968397a21bde3bb159516f26ff0
Parents: 5b5f6bb
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Dec 3 11:01:08 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 3 11:01:08 2015 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java    | 11 +++++------
 .../internals/ConsumerNetworkClient.java         | 19 +++++++++++++++----
 2 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/13e483ad/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 9b36af6..c559593 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -830,13 +830,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     // and avoid block waiting for their responses to enable pipelining while the user
                     // is handling the fetched records.
                     //
-                    // NOTE that in this case we need to disable wakeups for the non-blocking poll since
-                    // the consumed positions has already been updated and hence we must return these
-                    // records to users to process before being interrupted
+                    // NOTE that we use quickPoll() in this case which disables wakeups and delayed
+                    // task execution since the consumed positions has already been updated and we
+                    // must return these records to users to process before being interrupted or
+                    // auto-committing offsets
                     fetcher.initFetches(metadata.fetch());
-                    client.disableWakeups();
-                    client.poll(0);
-                    client.enableWakeups();
+                    client.quickPoll();
                     return new ConsumerRecords<>(records);
                 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/13e483ad/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 84c312e..f707d6f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -175,7 +175,7 @@ public class ConsumerNetworkClient implements Closeable {
         long remaining = timeout;
         long now = begin;
         do {
-            poll(remaining, now);
+            poll(remaining, now, true);
             now = time.milliseconds();
             long elapsed = now - begin;
             remaining = timeout - elapsed;
@@ -190,10 +190,20 @@ public class ConsumerNetworkClient implements Closeable {
      * @throws WakeupException if {@link #wakeup()} is called from another thread
      */
     public void poll(long timeout) {
-        poll(timeout, time.milliseconds());
+        poll(timeout, time.milliseconds(), true);
     }
 
-    private void poll(long timeout, long now) {
+    /**
+     * Poll for network IO and return immediately. This will not trigger wakeups,
+     * nor will it execute any delayed tasks.
+     */
+    public void quickPoll() {
+        disableWakeups();
+        poll(0, time.milliseconds(), false);
+        enableWakeups();
+    }
+
+    private void poll(long timeout, long now, boolean executeDelayedTasks) {
         // send all the requests we can send now
         trySend(now);
 
@@ -209,7 +219,8 @@ public class ConsumerNetworkClient implements Closeable {
         checkDisconnects(now);
 
         // execute scheduled tasks
-        delayedTasks.poll(now);
+        if (executeDelayedTasks)
+            delayedTasks.poll(now);
 
         // try again to send requests since buffer space may have been
         // cleared or a connect finished in the poll