You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/10 23:49:48 UTC
kafka git commit: KAFKA-2799: skip wakeup in the follow-up poll()
call.
Repository: kafka
Updated Branches:
refs/heads/trunk 8db55618d -> 9d9bb708b
KAFKA-2799: skip wakeup in the follow-up poll() call.
Author: Guozhang Wang <wa...@gmail.com>
Reviewers: Jason Gustafson
Closes #490 from guozhangwang/K2799
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9d9bb708
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9d9bb708
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9d9bb708
Branch: refs/heads/trunk
Commit: 9d9bb708bf59c93672306cd731b89d7df114bba7
Parents: 8db5561
Author: Guozhang Wang <wa...@gmail.com>
Authored: Tue Nov 10 14:55:47 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Nov 10 14:55:47 2015 -0800
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 18 +++++++++++++-----
.../consumer/internals/AbstractCoordinator.java | 1 +
.../consumer/internals/ConsumerCoordinator.java | 1 +
.../consumer/internals/ConsumerNetworkClient.java | 17 ++++++++++-------
.../kafka/clients/consumer/internals/Fetcher.java | 2 ++
5 files changed, 27 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d9bb708/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 89b2f0b..660c530 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -828,11 +828,17 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
- // if data is available, then return it, but first send off the
- // next round of fetches to enable pipelining while the user is
- // handling the fetched records.
+ // before returning the fetched records, we can send off the next round of fetches
+ // and avoid block waiting for their responses to enable pipelining while the user
+ // is handling the fetched records.
+ //
+ // NOTE that in this case we need to disable wakeups for the non-blocking poll since
+ // the consumed positions has already been updated and hence we must return these
+ // records to users to process before being interrupted
fetcher.initFetches(metadata.fetch());
+ client.disableWakeups();
client.poll(0);
+ client.enableWakeups();
return new ConsumerRecords<>(records);
}
@@ -868,11 +874,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// init any new fetches (won't resend pending fetches)
Cluster cluster = this.metadata.fetch();
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
- // Avoid block waiting for response if we already have data available, e.g. from another API call to commit.
+
+ // if data is available already, e.g. from a previous network client poll() call to commit,
+ // then just return it immediately
if (!records.isEmpty()) {
- client.poll(0);
return records;
}
+
fetcher.initFetches(cluster);
client.poll(timeout);
return fetcher.fetchedRecords();
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d9bb708/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index a12c6c1..cab7065 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -539,6 +539,7 @@ public abstract class AbstractCoordinator implements Closeable {
*/
@Override
public void close() {
+ // we do not need to re-enable wakeups since we are closing already
client.disableWakeups();
maybeLeaveGroup();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d9bb708/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 95aad6d..faef7ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -306,6 +306,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
@Override
public void close() {
+ // we do not need to re-enable wakeups since we are closing already
client.disableWakeups();
try {
if (autoCommitTask != null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d9bb708/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 20eb45d..f2e215d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -52,7 +52,8 @@ public class ConsumerNetworkClient implements Closeable {
private final Metadata metadata;
private final Time time;
private final long retryBackoffMs;
- private boolean wakeupsEnabled = true;
+ // wakeup enabled flag need to be volatile since it is allowed to be accessed concurrently
+ volatile private boolean wakeupsEnabled = true;
public ConsumerNetworkClient(KafkaClient client,
Metadata metadata,
@@ -141,10 +142,8 @@ public class ConsumerNetworkClient implements Closeable {
* on the current poll if one is active, or the next poll.
*/
public void wakeup() {
- if (wakeupsEnabled) {
- this.wakeup.set(true);
- this.client.wakeup();
- }
+ this.wakeup.set(true);
+ this.client.wakeup();
}
/**
@@ -301,7 +300,7 @@ public class ConsumerNetworkClient implements Closeable {
private void clientPoll(long timeout, long now) {
client.poll(timeout, now);
- if (wakeup.get()) {
+ if (wakeupsEnabled && wakeup.get()) {
failUnsentRequests();
wakeup.set(false);
throw new WakeupException();
@@ -309,12 +308,16 @@ public class ConsumerNetworkClient implements Closeable {
}
public void disableWakeups() {
- this.wakeup.set(false);
this.wakeupsEnabled = false;
}
public void enableWakeups() {
this.wakeupsEnabled = true;
+
+ // re-wakeup the client if the flag was set since previous wake-up call
+ // could be cleared by poll(0) while wakeups were disabled
+ if (wakeup.get())
+ this.client.wakeup();
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/9d9bb708/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 4f0fbed..e988b2b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -340,6 +340,8 @@ public class Fetcher<K, V> {
/**
* Return the fetched records, empty the record buffer and update the consumed position.
*
+ * NOTE: returning empty records guarantees the consumed position are NOT updated.
+ *
* @return The fetched records per partition
* @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE