You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/19 02:19:51 UTC
kafka git commit: KAFKA-2860: better handling of auto commit errors
Repository: kafka
Updated Branches:
refs/heads/trunk f154956a7 -> fa4244745
KAFKA-2860: better handling of auto commit errors
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Guozhang Wang
Closes #553 from hachikuji/KAFKA-2860
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fa424474
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fa424474
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fa424474
Branch: refs/heads/trunk
Commit: fa4244745fc363410fd5bc21e0b045f8124a8f9c
Parents: f154956
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Nov 18 17:19:47 2015 -0800
Committer: Confluent <co...@Confluents-MacBook-Pro.local>
Committed: Wed Nov 18 17:19:47 2015 -0800
----------------------------------------------------------------------
.../consumer/internals/AbstractCoordinator.java | 10 ++-
.../consumer/internals/ConsumerCoordinator.java | 91 +++++++++++++++-----
2 files changed, 79 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa424474/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index cab7065..9aa1aaf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -242,12 +242,16 @@ public abstract class AbstractCoordinator implements Closeable {
private class HeartbeatTask implements DelayedTask {
+ private boolean requestInFlight = false;
+
public void reset() {
// start or restart the heartbeat task to be executed at the next chance
long now = time.milliseconds();
heartbeat.resetSessionTimeout(now);
client.unschedule(this);
- client.schedule(this, now);
+
+ if (!requestInFlight)
+ client.schedule(this, now);
}
@Override
@@ -270,10 +274,13 @@ public abstract class AbstractCoordinator implements Closeable {
client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
} else {
heartbeat.sentHeartbeat(now);
+ requestInFlight = true;
+
RequestFuture<Void> future = sendHeartbeatRequest();
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
+ requestInFlight = false;
long now = time.milliseconds();
heartbeat.receiveHeartbeat(now);
long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
@@ -282,6 +289,7 @@ public abstract class AbstractCoordinator implements Closeable {
@Override
public void onFailure(RuntimeException e) {
+ requestInFlight = false;
client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
}
});
http://git-wip-us.apache.org/repos/asf/kafka/blob/fa424474/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2ee3a4d..f6d1029 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -69,7 +69,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private final SubscriptionState subscriptions;
private final OffsetCommitCallback defaultOffsetCommitCallback;
private final boolean autoCommitEnabled;
- private DelayedTask autoCommitTask = null;
+ private final AutoCommitTask autoCommitTask;
/**
* Initialize the coordination manager.
@@ -112,9 +112,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
addMetadataListener();
- if (autoCommitEnabled)
- this.autoCommitTask = scheduleAutoCommitTask(autoCommitIntervalMs);
-
+ this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
}
@@ -179,6 +177,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// give the assignor a chance to update internal state based on the received assignment
assignor.onAssignment(assignment);
+ // restart the autocommit task if needed
+ if (autoCommitEnabled)
+ autoCommitTask.enable();
+
// execute the user's callback after rebalance
ConsumerRebalanceListener listener = subscriptions.listener();
log.debug("Setting newly assigned partitions {}", subscriptions.assignedPartitions());
@@ -309,8 +311,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// we do not need to re-enable wakeups since we are closing already
client.disableWakeups();
try {
- if (autoCommitTask != null)
- client.unschedule(autoCommitTask);
maybeAutoCommitOffsetsSync();
} finally {
super.close();
@@ -362,25 +362,74 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
}
}
- private DelayedTask scheduleAutoCommitTask(final long interval) {
- DelayedTask task = new DelayedTask() {
- public void run(long now) {
- commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
- @Override
- public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
- if (exception != null)
- log.error("Auto offset commit failed.", exception);
- }
- });
- client.schedule(this, now + interval);
+ private class AutoCommitTask implements DelayedTask {
+ private final long interval;
+ private boolean enabled = false;
+ private boolean requestInFlight = false;
+
+ public AutoCommitTask(long interval) {
+ this.interval = interval;
+ }
+
+ public void enable() {
+ if (!enabled) {
+ // there shouldn't be any instances scheduled, but call unschedule anyway to ensure
+ // that this task is only ever scheduled once
+ client.unschedule(this);
+ this.enabled = true;
+
+ if (!requestInFlight) {
+ long now = time.milliseconds();
+ client.schedule(this, interval + now);
+ }
+ }
+ }
+
+ public void disable() {
+ this.enabled = false;
+ client.unschedule(this);
+ }
+
+ private void reschedule(long at) {
+ if (enabled)
+ client.schedule(this, at);
+ }
+
+ public void run(final long now) {
+ if (!enabled)
+ return;
+
+ if (coordinatorUnknown()) {
+ log.debug("Cannot auto-commit offsets now since the coordinator is unknown, will retry after backoff");
+ client.schedule(this, now + retryBackoffMs);
+ return;
}
- };
- client.schedule(task, time.milliseconds() + interval);
- return task;
+
+ requestInFlight = true;
+ commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
+ @Override
+ public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+ requestInFlight = false;
+ if (exception == null) {
+ reschedule(now + interval);
+ } else if (exception instanceof SendFailedException) {
+ log.debug("Failed to send automatic offset commit, will retry immediately");
+ reschedule(now);
+ } else {
+ log.warn("Auto offset commit failed: {}", exception.getMessage());
+ reschedule(now + interval);
+ }
+ }
+ });
+ }
}
private void maybeAutoCommitOffsetsSync() {
if (autoCommitEnabled) {
+ // disable periodic commits prior to committing synchronously. note that they will
+ // be re-enabled after a rebalance completes
+ autoCommitTask.disable();
+
try {
commitOffsetsSync(subscriptions.allConsumed());
} catch (WakeupException e) {
@@ -388,7 +437,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
throw e;
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate the exception
- log.error("Auto offset commit failed.", e);
+ log.warn("Auto offset commit failed: ", e.getMessage());
}
}
}