You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/27 23:05:28 UTC

kafka git commit: KAFKA-4569; Check for wakeup on every call to KafkaConsumer.poll

Repository: kafka
Updated Branches:
  refs/heads/trunk 1abed91bd -> f3f9a9eaf


KAFKA-4569; Check for wakeup on every call to KafkaConsumer.poll

Author: Armin Braun <me...@obrown.io>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #2699 from original-brownbear/KAFKA-4569


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f3f9a9ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f3f9a9ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f3f9a9ea

Branch: refs/heads/trunk
Commit: f3f9a9eafb9aee9d87dd188521a4fd72466abfeb
Parents: 1abed91
Author: Armin Braun <me...@obrown.io>
Authored: Mon Mar 27 16:01:15 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Mar 27 16:05:15 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   |  1 +
 .../internals/ConsumerNetworkClient.java        | 78 +++++++++-----------
 .../org/apache/kafka/clients/MockClient.java    |  4 +-
 .../clients/consumer/KafkaConsumerTest.java     | 18 ++++-
 4 files changed, 51 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f3f9a9ea/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 01d9463..a666540 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1033,6 +1033,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The fetched records (may be empty)
      */
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
+        client.maybeTriggerWakeup();
         coordinator.poll(time.milliseconds());
 
         // fetch positions if we have partitions we're subscribed to that we

http://git-wip-us.apache.org/repos/asf/kafka/blob/f3f9a9ea/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 890fe7a..92d049a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -16,6 +16,17 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
@@ -23,6 +34,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -32,19 +44,6 @@ import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.kafka.common.errors.InterruptException;
-
 /**
  * Higher level consumer access to the network layer with basic support for request futures. This class
  * is thread-safe, but provides no synchronization for response callbacks. This guarantees that no locks
@@ -62,7 +61,7 @@ public class ConsumerNetworkClient implements Closeable {
     private final Time time;
     private final long retryBackoffMs;
     private final long unsentExpiryMs;
-    private int wakeupDisabledCount = 0;
+    private final AtomicBoolean wakeupDisabled = new AtomicBoolean();
 
     // when requests complete, they are transferred to this queue prior to invocation. The purpose
     // is to avoid invoking them while holding this object's monitor which can open the door for deadlocks.
@@ -204,6 +203,16 @@ public class ConsumerNetworkClient implements Closeable {
      * @param now current time in milliseconds
      */
     public void poll(long timeout, long now, PollCondition pollCondition) {
+        poll(timeout, now, pollCondition, false);
+    }
+
+    /**
+     * Poll for any network IO.
+     * @param timeout timeout in milliseconds
+     * @param now current time in milliseconds
+     * @param disableWakeup If TRUE disable triggering wake-ups
+     */
+    public void poll(long timeout, long now, PollCondition pollCondition, boolean disableWakeup) {
         // there may be handlers which need to be invoked if we woke up the previous call to poll
         firePendingCompletedRequests();
 
@@ -228,11 +237,11 @@ public class ConsumerNetworkClient implements Closeable {
             // be checked immediately following poll since any subsequent call to client.ready()
             // will reset the disconnect status
             checkDisconnects(now);
-
-            // trigger wakeups after checking for disconnects so that the callbacks will be ready
-            // to be fired on the next call to poll()
-            maybeTriggerWakeup();
-            
+            if (!disableWakeup) {
+                // trigger wakeups after checking for disconnects so that the callbacks will be ready
+                // to be fired on the next call to poll()
+                maybeTriggerWakeup();
+            }
             // throw InterruptException if this thread is interrupted
             maybeThrowInterruptException();
 
@@ -256,12 +265,7 @@ public class ConsumerNetworkClient implements Closeable {
      * nor will it execute any delayed tasks.
      */
     public void pollNoWakeup() {
-        disableWakeups();
-        try {
-            poll(0, time.milliseconds(), null);
-        } finally {
-            enableWakeups();
-        }
+        poll(0, time.milliseconds(), null, true);
     }
 
     /**
@@ -409,14 +413,14 @@ public class ConsumerNetworkClient implements Closeable {
         return requestsSent;
     }
 
-    private void maybeTriggerWakeup() {
-        if (wakeupDisabledCount == 0 && wakeup.get()) {
+    public void maybeTriggerWakeup() {
+        if (!wakeupDisabled.get() && wakeup.get()) {
             log.trace("Raising wakeup exception in response to user wakeup");
             wakeup.set(false);
             throw new WakeupException();
         }
     }
-    
+
     private void maybeThrowInterruptException() {
         if (Thread.interrupted()) {
             throw new InterruptException(new InterruptedException());
@@ -424,23 +428,7 @@ public class ConsumerNetworkClient implements Closeable {
     }
 
     public void disableWakeups() {
-        synchronized (this) {
-            wakeupDisabledCount++;
-        }
-    }
-
-    public void enableWakeups() {
-        synchronized (this) {
-            if (wakeupDisabledCount <= 0)
-                throw new IllegalStateException("Cannot enable wakeups since they were never disabled");
-
-            wakeupDisabledCount--;
-
-            // re-wakeup the client if the flag was set since previous wake-up call
-            // could be cleared by poll(0) while wakeups were disabled
-            if (wakeupDisabledCount == 0 && wakeup.get())
-                this.client.wakeup();
-        }
+        wakeupDisabled.set(true);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f3f9a9ea/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index b871a49..f4141a5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -178,8 +178,8 @@ public class MockClient implements KafkaClient {
             }
         }
 
-        while (!this.responses.isEmpty()) {
-            ClientResponse response = this.responses.poll();
+        ClientResponse response;
+        while ((response = this.responses.poll()) != null) {
             response.onComplete();
         }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f3f9a9ea/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ec60209..7f20472 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
@@ -646,16 +647,16 @@ public class KafkaConsumerTest {
     }
 
     @Test
-    public void testWakeupWithFetchDataAvailable() {
+    public void testWakeupWithFetchDataAvailable() throws Exception {
         int rebalanceTimeoutMs = 60000;
-        int sessionTimeoutMs = 30000;
+        final int sessionTimeoutMs = 30000;
         int heartbeatIntervalMs = 3000;
 
         // adjust auto commit interval lower than heartbeat so we don't need to deal with
         // a concurrent heartbeat request
         int autoCommitIntervalMs = 1000;
 
-        Time time = new MockTime();
+        final Time time = new MockTime();
         Cluster cluster = TestUtils.singletonCluster(topic, 1);
         Node node = cluster.nodes().get(0);
 
@@ -692,6 +693,17 @@ public class KafkaConsumerTest {
         // the next poll should return the completed fetch
         ConsumerRecords<String, String> records = consumer.poll(0);
         assertEquals(5, records.count());
+        // Increment time asynchronously to clear timeouts in closing the consumer
+        final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
+        exec.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                time.sleep(sessionTimeoutMs);
+            }
+        }, 0L, 10L, TimeUnit.MILLISECONDS);
+        consumer.close();
+        exec.shutdownNow();
+        exec.awaitTermination(5L, TimeUnit.SECONDS);
     }
 
     @Test