You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/05 17:37:00 UTC

kafka git commit: KAFKA-2742: Fix SourceTaskOffsetCommitter to handle removal of commit tasks when they are already in progress.

Repository: kafka
Updated Branches:
  refs/heads/trunk 68f42210a -> 4a9e7607b


KAFKA-2742: Fix SourceTaskOffsetCommitter to handle removal of commit tasks when they are already in progress.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Guozhang Wang

Closes #421 from ewencp/wait-on-in-progress-source-offset-commits


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a9e7607
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a9e7607
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a9e7607

Branch: refs/heads/trunk
Commit: 4a9e7607b9e563195556873b6ec9b74561cedfbb
Parents: 68f4221
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Nov 5 08:42:44 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 5 08:42:44 2015 -0800

----------------------------------------------------------------------
 .../runtime/SourceTaskOffsetCommitter.java      | 67 +++++++++++++++-----
 1 file changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4a9e7607/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
index 20a79ca..6bb51b9 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
@@ -18,11 +18,13 @@
 package org.apache.kafka.copycat.runtime;
 
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -39,9 +41,6 @@ import java.util.concurrent.TimeUnit;
  * schedule. Instead, this class tracks all the active tasks, their schedule for commits, and
  * ensures they are invoked in a timely fashion.
  * </p>
- * <p>
- *   The current implementation uses a single thread to process commits and
- * </p>
  */
 class SourceTaskOffsetCommitter {
     private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
@@ -49,7 +48,7 @@ class SourceTaskOffsetCommitter {
     private Time time;
     private WorkerConfig config;
     private ScheduledExecutorService commitExecutorService = null;
-    private HashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new HashMap<>();
+    private HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>();
 
     SourceTaskOffsetCommitter(Time time, WorkerConfig config) {
         this.time = time;
@@ -69,22 +68,43 @@ class SourceTaskOffsetCommitter {
     }
 
     public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) {
-        long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
-        ScheduledFuture<?> commitFuture = commitExecutorService.scheduleWithFixedDelay(new Runnable() {
-            @Override
-            public void run() {
-                commit(workerTask);
-            }
-        }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
-        committers.put(id, commitFuture);
+        synchronized (committers) {
+            long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+            ScheduledFuture<?> commitFuture = commitExecutorService.schedule(new Runnable() {
+                @Override
+                public void run() {
+                    commit(id, workerTask);
+                }
+            }, commitIntervalMs, TimeUnit.MILLISECONDS);
+            committers.put(id, new ScheduledCommitTask(commitFuture));
+        }
     }
 
     public void remove(ConnectorTaskId id) {
-        ScheduledFuture<?> commitFuture = committers.remove(id);
-        commitFuture.cancel(false);
+        final ScheduledCommitTask task;
+        synchronized (committers) {
+            task = committers.remove(id);
+            task.cancelled = true;
+            task.commitFuture.cancel(false);
+        }
+        if (task.finishedLatch != null) {
+            try {
+                task.finishedLatch.await();
+            } catch (InterruptedException e) {
+                throw new CopycatException("Unexpected interruption in SourceTaskOffsetCommitter.", e);
+            }
+        }
     }
 
-    public void commit(WorkerSourceTask workerTask) {
+    public void commit(ConnectorTaskId id, WorkerSourceTask workerTask) {
+        final ScheduledCommitTask task;
+        synchronized (committers) {
+            task = committers.get(id);
+            if (task == null || task.cancelled)
+                return;
+            task.finishedLatch = new CountDownLatch(1);
+        }
+
         try {
             log.debug("Committing offsets for {}", workerTask);
             boolean success = workerTask.commitOffsets();
@@ -96,7 +116,24 @@ class SourceTaskOffsetCommitter {
             // thread would cause the fixed interval schedule on the ExecutorService to stop running
             // for that task
             log.error("Unhandled exception when committing {}: ", workerTask, t);
+        } finally {
+            synchronized (committers) {
+                task.finishedLatch.countDown();
+                if (!task.cancelled)
+                    schedule(id, workerTask);
+            }
         }
     }
 
+    private static class ScheduledCommitTask {
+        ScheduledFuture<?> commitFuture;
+        boolean cancelled;
+        CountDownLatch finishedLatch;
+
+        ScheduledCommitTask(ScheduledFuture<?> commitFuture) {
+            this.commitFuture = commitFuture;
+            this.cancelled = false;
+            this.finishedLatch = null;
+        }
+    }
 }