You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/12/03 20:01:16 UTC
kafka git commit: KAFKA-2942: inadvertent auto-commit when
pre-fetching can cause message loss
Repository: kafka
Updated Branches:
refs/heads/trunk 5b5f6bbe6 -> 13e483ade
KAFKA-2942: inadvertent auto-commit when pre-fetching can cause message loss
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang
Closes #623 from hachikuji/KAFKA-2942
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13e483ad
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13e483ad
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13e483ad
Branch: refs/heads/trunk
Commit: 13e483adeee8d968397a21bde3bb159516f26ff0
Parents: 5b5f6bb
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Dec 3 11:01:08 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Dec 3 11:01:08 2015 -0800
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 11 +++++------
.../internals/ConsumerNetworkClient.java | 19 +++++++++++++++----
2 files changed, 20 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/13e483ad/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 9b36af6..c559593 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -830,13 +830,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
- // NOTE that in this case we need to disable wakeups for the non-blocking poll since
- // the consumed positions has already been updated and hence we must return these
- // records to users to process before being interrupted
+ // NOTE that we use quickPoll() in this case which disables wakeups and delayed
+ // task execution since the consumed positions has already been updated and we
+ // must return these records to users to process before being interrupted or
+ // auto-committing offsets
fetcher.initFetches(metadata.fetch());
- client.disableWakeups();
- client.poll(0);
- client.enableWakeups();
+ client.quickPoll();
return new ConsumerRecords<>(records);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/13e483ad/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 84c312e..f707d6f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -175,7 +175,7 @@ public class ConsumerNetworkClient implements Closeable {
long remaining = timeout;
long now = begin;
do {
- poll(remaining, now);
+ poll(remaining, now, true);
now = time.milliseconds();
long elapsed = now - begin;
remaining = timeout - elapsed;
@@ -190,10 +190,20 @@ public class ConsumerNetworkClient implements Closeable {
* @throws WakeupException if {@link #wakeup()} is called from another thread
*/
public void poll(long timeout) {
- poll(timeout, time.milliseconds());
+ poll(timeout, time.milliseconds(), true);
}
- private void poll(long timeout, long now) {
+ /**
+ * Poll for network IO and return immediately. This will not trigger wakeups,
+ * nor will it execute any delayed tasks.
+ */
+ public void quickPoll() {
+ disableWakeups();
+ poll(0, time.milliseconds(), false);
+ enableWakeups();
+ }
+
+ private void poll(long timeout, long now, boolean executeDelayedTasks) {
// send all the requests we can send now
trySend(now);
@@ -209,7 +219,8 @@ public class ConsumerNetworkClient implements Closeable {
checkDisconnects(now);
// execute scheduled tasks
- delayedTasks.poll(now);
+ if (executeDelayedTasks)
+ delayedTasks.poll(now);
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll