You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/19 02:19:51 UTC

kafka git commit: KAFKA-2860: better handling of auto commit errors

Repository: kafka
Updated Branches:
  refs/heads/trunk f154956a7 -> fa4244745


KAFKA-2860: better handling of auto commit errors

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang

Closes #553 from hachikuji/KAFKA-2860


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fa424474
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fa424474
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fa424474

Branch: refs/heads/trunk
Commit: fa4244745fc363410fd5bc21e0b045f8124a8f9c
Parents: f154956
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Nov 18 17:19:47 2015 -0800
Committer: Confluent <co...@Confluents-MacBook-Pro.local>
Committed: Wed Nov 18 17:19:47 2015 -0800

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 10 ++-
 .../consumer/internals/ConsumerCoordinator.java | 91 +++++++++++++++-----
 2 files changed, 79 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fa424474/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index cab7065..9aa1aaf 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -242,12 +242,16 @@ public abstract class AbstractCoordinator implements Closeable {
 
     private class HeartbeatTask implements DelayedTask {
 
+        private boolean requestInFlight = false;
+
         public void reset() {
             // start or restart the heartbeat task to be executed at the next chance
             long now = time.milliseconds();
             heartbeat.resetSessionTimeout(now);
             client.unschedule(this);
-            client.schedule(this, now);
+
+            if (!requestInFlight)
+                client.schedule(this, now);
         }
 
         @Override
@@ -270,10 +274,13 @@ public abstract class AbstractCoordinator implements Closeable {
                 client.schedule(this, now + heartbeat.timeToNextHeartbeat(now));
             } else {
                 heartbeat.sentHeartbeat(now);
+                requestInFlight = true;
+
                 RequestFuture<Void> future = sendHeartbeatRequest();
                 future.addListener(new RequestFutureListener<Void>() {
                     @Override
                     public void onSuccess(Void value) {
+                        requestInFlight = false;
                         long now = time.milliseconds();
                         heartbeat.receiveHeartbeat(now);
                         long nextHeartbeatTime = now + heartbeat.timeToNextHeartbeat(now);
@@ -282,6 +289,7 @@ public abstract class AbstractCoordinator implements Closeable {
 
                     @Override
                     public void onFailure(RuntimeException e) {
+                        requestInFlight = false;
                         client.schedule(HeartbeatTask.this, time.milliseconds() + retryBackoffMs);
                     }
                 });

http://git-wip-us.apache.org/repos/asf/kafka/blob/fa424474/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2ee3a4d..f6d1029 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -69,7 +69,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     private final SubscriptionState subscriptions;
     private final OffsetCommitCallback defaultOffsetCommitCallback;
     private final boolean autoCommitEnabled;
-    private DelayedTask autoCommitTask = null;
+    private final AutoCommitTask autoCommitTask;
 
     /**
      * Initialize the coordination manager.
@@ -112,9 +112,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         addMetadataListener();
 
-        if (autoCommitEnabled)
-            this.autoCommitTask = scheduleAutoCommitTask(autoCommitIntervalMs);
-
+        this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix, metricTags);
     }
 
@@ -179,6 +177,10 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         // give the assignor a chance to update internal state based on the received assignment
         assignor.onAssignment(assignment);
 
+        // restart the autocommit task if needed
+        if (autoCommitEnabled)
+            autoCommitTask.enable();
+
         // execute the user's callback after rebalance
         ConsumerRebalanceListener listener = subscriptions.listener();
         log.debug("Setting newly assigned partitions {}", subscriptions.assignedPartitions());
@@ -309,8 +311,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         // we do not need to re-enable wakeups since we are closing already
         client.disableWakeups();
         try {
-            if (autoCommitTask != null)
-                client.unschedule(autoCommitTask);
             maybeAutoCommitOffsetsSync();
         } finally {
             super.close();
@@ -362,25 +362,74 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         }
     }
 
-    private DelayedTask scheduleAutoCommitTask(final long interval) {
-        DelayedTask task = new DelayedTask() {
-            public void run(long now) {
-                commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
-                    @Override
-                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-                        if (exception != null)
-                            log.error("Auto offset commit failed.", exception);
-                    }
-                });
-                client.schedule(this, now + interval);
+    private class AutoCommitTask implements DelayedTask {
+        private final long interval;
+        private boolean enabled = false;
+        private boolean requestInFlight = false;
+
+        public AutoCommitTask(long interval) {
+            this.interval = interval;
+        }
+
+        public void enable() {
+            if (!enabled) {
+                // there shouldn't be any instances scheduled, but call unschedule anyway to ensure
+                // that this task is only ever scheduled once
+                client.unschedule(this);
+                this.enabled = true;
+
+                if (!requestInFlight) {
+                    long now = time.milliseconds();
+                    client.schedule(this, interval + now);
+                }
+            }
+        }
+
+        public void disable() {
+            this.enabled = false;
+            client.unschedule(this);
+        }
+
+        private void reschedule(long at) {
+            if (enabled)
+                client.schedule(this, at);
+        }
+
+        public void run(final long now) {
+            if (!enabled)
+                return;
+
+            if (coordinatorUnknown()) {
+                log.debug("Cannot auto-commit offsets now since the coordinator is unknown, will retry after backoff");
+                client.schedule(this, now + retryBackoffMs);
+                return;
             }
-        };
-        client.schedule(task, time.milliseconds() + interval);
-        return task;
+
+            requestInFlight = true;
+            commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
+                @Override
+                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
+                    requestInFlight = false;
+                    if (exception == null) {
+                        reschedule(now + interval);
+                    } else if (exception instanceof SendFailedException) {
+                        log.debug("Failed to send automatic offset commit, will retry immediately");
+                        reschedule(now);
+                    } else {
+                        log.warn("Auto offset commit failed: {}", exception.getMessage());
+                        reschedule(now + interval);
+                    }
+                }
+            });
+        }
     }
 
     private void maybeAutoCommitOffsetsSync() {
         if (autoCommitEnabled) {
+            // disable periodic commits prior to committing synchronously. note that they will
+            // be re-enabled after a rebalance completes
+            autoCommitTask.disable();
+
             try {
                 commitOffsetsSync(subscriptions.allConsumed());
             } catch (WakeupException e) {
@@ -388,7 +437,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
                 throw e;
             } catch (Exception e) {
                 // consistent with async auto-commit failures, we do not propagate the exception
-                log.error("Auto offset commit failed.", e);
+                log.warn("Auto offset commit failed: ", e.getMessage());
             }
         }
     }