You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 23:49:48 UTC

kafka git commit: KAFKA-2799: skip wakeup in the follow-up poll() call.

Repository: kafka
Updated Branches:
  refs/heads/trunk 8db55618d -> 9d9bb708b


KAFKA-2799: skip wakeup in the follow-up poll() call.

Author: Guozhang Wang <wa...@gmail.com>

Reviewers: Jason Gustafson

Closes #490 from guozhangwang/K2799


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9d9bb708
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9d9bb708
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9d9bb708

Branch: refs/heads/trunk
Commit: 9d9bb708bf59c93672306cd731b89d7df114bba7
Parents: 8db5561
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Nov 10 14:55:47 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 10 14:55:47 2015 -0800

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java     | 18 +++++++++++++-----
 .../consumer/internals/AbstractCoordinator.java   |  1 +
 .../consumer/internals/ConsumerCoordinator.java   |  1 +
 .../consumer/internals/ConsumerNetworkClient.java | 17 ++++++++++-------
 .../kafka/clients/consumer/internals/Fetcher.java |  2 ++
 5 files changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9d9bb708/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 89b2f0b..660c530 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -828,11 +828,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             do {
                 Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                 if (!records.isEmpty()) {
-                    // if data is available, then return it, but first send off the
-                    // next round of fetches to enable pipelining while the user is
-                    // handling the fetched records.
+                    // before returning the fetched records, we can send off the next round of fetches
+                    // and avoid block waiting for their responses to enable pipelining while the user
+                    // is handling the fetched records.
+                    //
+                    // NOTE that in this case we need to disable wakeups for the non-blocking poll since
+                    // the consumed positions has already been updated and hence we must return these
+                    // records to users to process before being interrupted
                     fetcher.initFetches(metadata.fetch());
+                    client.disableWakeups();
                     client.poll(0);
+                    client.enableWakeups();
                     return new ConsumerRecords<>(records);
                 }
 
@@ -868,11 +874,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         // init any new fetches (won't resend pending fetches)
         Cluster cluster = this.metadata.fetch();
         Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
-        // Avoid block waiting for response if we already have data available, e.g. from another API call to commit.
+
+        // if data is available already, e.g. from a previous network client poll() call to commit,
+        // then just return it immediately
         if (!records.isEmpty()) {
-            client.poll(0);
             return records;
         }
+
         fetcher.initFetches(cluster);
         client.poll(timeout);
         return fetcher.fetchedRecords();

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d9bb708/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index a12c6c1..cab7065 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -539,6 +539,7 @@ public abstract class AbstractCoordinator implements Closeable {
      */
     @Override
     public void close() {
+        // we do not need to re-enable wakeups since we are closing already
         client.disableWakeups();
         maybeLeaveGroup();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d9bb708/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 95aad6d..faef7ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -306,6 +306,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     @Override
     public void close() {
+        // we do not need to re-enable wakeups since we are closing already
         client.disableWakeups();
         try {
             if (autoCommitTask != null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d9bb708/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 20eb45d..f2e215d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -52,7 +52,8 @@ public class ConsumerNetworkClient implements Closeable {
     private final Metadata metadata;
     private final Time time;
     private final long retryBackoffMs;
-    private boolean wakeupsEnabled = true;
+    // wakeup enabled flag need to be volatile since it is allowed to be accessed concurrently
+    volatile private boolean wakeupsEnabled = true;
 
     public ConsumerNetworkClient(KafkaClient client,
                                  Metadata metadata,
@@ -141,10 +142,8 @@ public class ConsumerNetworkClient implements Closeable {
      * on the current poll if one is active, or the next poll.
      */
     public void wakeup() {
-        if (wakeupsEnabled) {
-            this.wakeup.set(true);
-            this.client.wakeup();
-        }
+        this.wakeup.set(true);
+        this.client.wakeup();
     }
 
     /**
@@ -301,7 +300,7 @@ public class ConsumerNetworkClient implements Closeable {
 
     private void clientPoll(long timeout, long now) {
         client.poll(timeout, now);
-        if (wakeup.get()) {
+        if (wakeupsEnabled && wakeup.get()) {
             failUnsentRequests();
             wakeup.set(false);
             throw new WakeupException();
@@ -309,12 +308,16 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     public void disableWakeups() {
-        this.wakeup.set(false);
         this.wakeupsEnabled = false;
     }
 
     public void enableWakeups() {
         this.wakeupsEnabled = true;
+
+        // re-wakeup the client if the flag was set since previous wake-up call
+        // could be cleared by poll(0) while wakeups were disabled
+        if (wakeup.get())
+            this.client.wakeup();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9d9bb708/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4f0fbed..e988b2b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -340,6 +340,8 @@ public class Fetcher<K, V> {
     /**
      * Return the fetched records, empty the record buffer and update the consumed position.
      *
+     * NOTE: returning empty records guarantees the consumed position are NOT updated.
+     *
      * @return The fetched records per partition
      * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
      *         the defaultResetPolicy is NONE