You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/05 17:37:00 UTC
kafka git commit: KAFKA-2742: Fix SourceTaskOffsetCommitter to handle
removal of commit tasks when they are already in progress.
Repository: kafka
Updated Branches:
refs/heads/trunk 68f42210a -> 4a9e7607b
KAFKA-2742: Fix SourceTaskOffsetCommitter to handle removal of commit tasks when they are already in progress.
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Guozhang Wang
Closes #421 from ewencp/wait-on-in-progress-source-offset-commits
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4a9e7607
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4a9e7607
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4a9e7607
Branch: refs/heads/trunk
Commit: 4a9e7607b9e563195556873b6ec9b74561cedfbb
Parents: 68f4221
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Nov 5 08:42:44 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 5 08:42:44 2015 -0800
----------------------------------------------------------------------
.../runtime/SourceTaskOffsetCommitter.java | 67 +++++++++++++++-----
1 file changed, 52 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4a9e7607/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
index 20a79ca..6bb51b9 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
@@ -18,11 +18,13 @@
package org.apache.kafka.copycat.runtime;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -39,9 +41,6 @@ import java.util.concurrent.TimeUnit;
* schedule. Instead, this class tracks all the active tasks, their schedule for commits, and
* ensures they are invoked in a timely fashion.
* </p>
- * <p>
- * The current implementation uses a single thread to process commits and
- * </p>
*/
class SourceTaskOffsetCommitter {
private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
@@ -49,7 +48,7 @@ class SourceTaskOffsetCommitter {
private Time time;
private WorkerConfig config;
private ScheduledExecutorService commitExecutorService = null;
- private HashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new HashMap<>();
+ private HashMap<ConnectorTaskId, ScheduledCommitTask> committers = new HashMap<>();
SourceTaskOffsetCommitter(Time time, WorkerConfig config) {
this.time = time;
@@ -69,22 +68,43 @@ class SourceTaskOffsetCommitter {
}
public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) {
- long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
- ScheduledFuture<?> commitFuture = commitExecutorService.scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- commit(workerTask);
- }
- }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
- committers.put(id, commitFuture);
+ synchronized (committers) {
+ long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+ ScheduledFuture<?> commitFuture = commitExecutorService.schedule(new Runnable() {
+ @Override
+ public void run() {
+ commit(id, workerTask);
+ }
+ }, commitIntervalMs, TimeUnit.MILLISECONDS);
+ committers.put(id, new ScheduledCommitTask(commitFuture));
+ }
}
public void remove(ConnectorTaskId id) {
- ScheduledFuture<?> commitFuture = committers.remove(id);
- commitFuture.cancel(false);
+ final ScheduledCommitTask task;
+ synchronized (committers) {
+ task = committers.remove(id);
+ task.cancelled = true;
+ task.commitFuture.cancel(false);
+ }
+ if (task.finishedLatch != null) {
+ try {
+ task.finishedLatch.await();
+ } catch (InterruptedException e) {
+ throw new CopycatException("Unexpected interruption in SourceTaskOffsetCommitter.", e);
+ }
+ }
}
- public void commit(WorkerSourceTask workerTask) {
+ public void commit(ConnectorTaskId id, WorkerSourceTask workerTask) {
+ final ScheduledCommitTask task;
+ synchronized (committers) {
+ task = committers.get(id);
+ if (task == null || task.cancelled)
+ return;
+ task.finishedLatch = new CountDownLatch(1);
+ }
+
try {
log.debug("Committing offsets for {}", workerTask);
boolean success = workerTask.commitOffsets();
@@ -96,7 +116,24 @@ class SourceTaskOffsetCommitter {
// thread would cause the fixed interval schedule on the ExecutorService to stop running
// for that task
log.error("Unhandled exception when committing {}: ", workerTask, t);
+ } finally {
+ synchronized (committers) {
+ task.finishedLatch.countDown();
+ if (!task.cancelled)
+ schedule(id, workerTask);
+ }
}
}
+ private static class ScheduledCommitTask {
+ ScheduledFuture<?> commitFuture;
+ boolean cancelled;
+ CountDownLatch finishedLatch;
+
+ ScheduledCommitTask(ScheduledFuture<?> commitFuture) {
+ this.commitFuture = commitFuture;
+ this.cancelled = false;
+ this.finishedLatch = null;
+ }
+ }
}