You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/04 20:59:52 UTC
kafka git commit: KAFKA-2744: Commit source task offsets after task
is completely stopped
Repository: kafka
Updated Branches:
refs/heads/trunk c39e79bb5 -> 70a784b64
KAFKA-2744: Commit source task offsets after task is completely stopped
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Reviewers: Guozhang Wang
Closes #423 from ewencp/commit-source-offsets-after-work-thread-exits
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/70a784b6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/70a784b6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/70a784b6
Branch: refs/heads/trunk
Commit: 70a784b64ab61bcd517619fed44419d59d467b27
Parents: c39e79b
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Nov 4 12:05:34 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 4 12:05:34 2015 -0800
----------------------------------------------------------------------
.../apache/kafka/copycat/runtime/WorkerSourceTask.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/70a784b6/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index ea9e6b5..78b588c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -96,24 +96,24 @@ class WorkerSourceTask implements WorkerTask {
@Override
public void stop() {
task.stop();
- commitOffsets();
if (workThread != null)
workThread.startGracefulShutdown();
}
@Override
public boolean awaitStop(long timeoutMs) {
+ boolean success = true;
if (workThread != null) {
try {
- boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
+ success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
if (!success)
workThread.forceShutdown();
- return success;
} catch (InterruptedException e) {
- return false;
+ success = false;
}
}
- return true;
+ commitOffsets();
+ return success;
}
@Override