You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/03/27 23:05:28 UTC
kafka git commit: KAFKA-4569;
Check for wakeup on every call to KafkaConsumer.poll
Repository: kafka
Updated Branches:
refs/heads/trunk 1abed91bd -> f3f9a9eaf
KAFKA-4569; Check for wakeup on every call to KafkaConsumer.poll
Author: Armin Braun <me...@obrown.io>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>
Closes #2699 from original-brownbear/KAFKA-4569
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f3f9a9ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f3f9a9ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f3f9a9ea
Branch: refs/heads/trunk
Commit: f3f9a9eafb9aee9d87dd188521a4fd72466abfeb
Parents: 1abed91
Author: Armin Braun <me...@obrown.io>
Authored: Mon Mar 27 16:01:15 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Mar 27 16:05:15 2017 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 1 +
.../internals/ConsumerNetworkClient.java | 78 +++++++++-----------
.../org/apache/kafka/clients/MockClient.java | 4 +-
.../clients/consumer/KafkaConsumerTest.java | 18 ++++-
4 files changed, 51 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f3f9a9ea/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 01d9463..a666540 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1033,6 +1033,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The fetched records (may be empty)
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
+ client.maybeTriggerWakeup();
coordinator.poll(time.milliseconds());
// fetch positions if we have partitions we're subscribed to that we
http://git-wip-us.apache.org/repos/asf/kafka/blob/f3f9a9ea/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 890fe7a..92d049a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -16,6 +16,17 @@
*/
package org.apache.kafka.clients.consumer.internals;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
@@ -23,6 +34,7 @@ import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
+import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -32,19 +44,6 @@ import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.kafka.common.errors.InterruptException;
-
/**
* Higher level consumer access to the network layer with basic support for request futures. This class
* is thread-safe, but provides no synchronization for response callbacks. This guarantees that no locks
@@ -62,7 +61,7 @@ public class ConsumerNetworkClient implements Closeable {
private final Time time;
private final long retryBackoffMs;
private final long unsentExpiryMs;
- private int wakeupDisabledCount = 0;
+ private final AtomicBoolean wakeupDisabled = new AtomicBoolean();
// when requests complete, they are transferred to this queue prior to invocation. The purpose
// is to avoid invoking them while holding this object's monitor which can open the door for deadlocks.
@@ -204,6 +203,16 @@ public class ConsumerNetworkClient implements Closeable {
* @param now current time in milliseconds
*/
public void poll(long timeout, long now, PollCondition pollCondition) {
+ poll(timeout, now, pollCondition, false);
+ }
+
+ /**
+ * Poll for any network IO.
+ * @param timeout timeout in milliseconds
+ * @param now current time in milliseconds
+ * @param disableWakeup If TRUE disable triggering wake-ups
+ */
+ public void poll(long timeout, long now, PollCondition pollCondition, boolean disableWakeup) {
// there may be handlers which need to be invoked if we woke up the previous call to poll
firePendingCompletedRequests();
@@ -228,11 +237,11 @@ public class ConsumerNetworkClient implements Closeable {
// be checked immediately following poll since any subsequent call to client.ready()
// will reset the disconnect status
checkDisconnects(now);
-
- // trigger wakeups after checking for disconnects so that the callbacks will be ready
- // to be fired on the next call to poll()
- maybeTriggerWakeup();
-
+ if (!disableWakeup) {
+ // trigger wakeups after checking for disconnects so that the callbacks will be ready
+ // to be fired on the next call to poll()
+ maybeTriggerWakeup();
+ }
// throw InterruptException if this thread is interrupted
maybeThrowInterruptException();
@@ -256,12 +265,7 @@ public class ConsumerNetworkClient implements Closeable {
* nor will it execute any delayed tasks.
*/
public void pollNoWakeup() {
- disableWakeups();
- try {
- poll(0, time.milliseconds(), null);
- } finally {
- enableWakeups();
- }
+ poll(0, time.milliseconds(), null, true);
}
/**
@@ -409,14 +413,14 @@ public class ConsumerNetworkClient implements Closeable {
return requestsSent;
}
- private void maybeTriggerWakeup() {
- if (wakeupDisabledCount == 0 && wakeup.get()) {
+ public void maybeTriggerWakeup() {
+ if (!wakeupDisabled.get() && wakeup.get()) {
log.trace("Raising wakeup exception in response to user wakeup");
wakeup.set(false);
throw new WakeupException();
}
}
-
+
private void maybeThrowInterruptException() {
if (Thread.interrupted()) {
throw new InterruptException(new InterruptedException());
@@ -424,23 +428,7 @@ public class ConsumerNetworkClient implements Closeable {
}
public void disableWakeups() {
- synchronized (this) {
- wakeupDisabledCount++;
- }
- }
-
- public void enableWakeups() {
- synchronized (this) {
- if (wakeupDisabledCount <= 0)
- throw new IllegalStateException("Cannot enable wakeups since they were never disabled");
-
- wakeupDisabledCount--;
-
- // re-wakeup the client if the flag was set since previous wake-up call
- // could be cleared by poll(0) while wakeups were disabled
- if (wakeupDisabledCount == 0 && wakeup.get())
- this.client.wakeup();
- }
+ wakeupDisabled.set(true);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/f3f9a9ea/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index b871a49..f4141a5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -178,8 +178,8 @@ public class MockClient implements KafkaClient {
}
}
- while (!this.responses.isEmpty()) {
- ClientResponse response = this.responses.poll();
+ ClientResponse response;
+ while ((response = this.responses.poll()) != null) {
response.onComplete();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f3f9a9ea/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ec60209..7f20472 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.clients.consumer;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
@@ -646,16 +647,16 @@ public class KafkaConsumerTest {
}
@Test
- public void testWakeupWithFetchDataAvailable() {
+ public void testWakeupWithFetchDataAvailable() throws Exception {
int rebalanceTimeoutMs = 60000;
- int sessionTimeoutMs = 30000;
+ final int sessionTimeoutMs = 30000;
int heartbeatIntervalMs = 3000;
// adjust auto commit interval lower than heartbeat so we don't need to deal with
// a concurrent heartbeat request
int autoCommitIntervalMs = 1000;
- Time time = new MockTime();
+ final Time time = new MockTime();
Cluster cluster = TestUtils.singletonCluster(topic, 1);
Node node = cluster.nodes().get(0);
@@ -692,6 +693,17 @@ public class KafkaConsumerTest {
// the next poll should return the completed fetch
ConsumerRecords<String, String> records = consumer.poll(0);
assertEquals(5, records.count());
+ // Increment time asynchronously to clear timeouts in closing the consumer
+ final ScheduledExecutorService exec = Executors.newSingleThreadScheduledExecutor();
+ exec.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ time.sleep(sessionTimeoutMs);
+ }
+ }, 0L, 10L, TimeUnit.MILLISECONDS);
+ consumer.close();
+ exec.shutdownNow();
+ exec.awaitTermination(5L, TimeUnit.SECONDS);
}
@Test