You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/11/04 20:59:52 UTC

kafka git commit: KAFKA-2744: Commit source task offsets after task is completely stopped

Repository: kafka
Updated Branches:
  refs/heads/trunk c39e79bb5 -> 70a784b64


KAFKA-2744: Commit source task offsets after task is completely stopped

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Guozhang Wang

Closes #423 from ewencp/commit-source-offsets-after-work-thread-exits


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/70a784b6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/70a784b6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/70a784b6

Branch: refs/heads/trunk
Commit: 70a784b64ab61bcd517619fed44419d59d467b27
Parents: c39e79b
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Nov 4 12:05:34 2015 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Nov 4 12:05:34 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/copycat/runtime/WorkerSourceTask.java    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/70a784b6/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index ea9e6b5..78b588c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -96,24 +96,24 @@ class WorkerSourceTask implements WorkerTask {
     @Override
     public void stop() {
         task.stop();
-        commitOffsets();
         if (workThread != null)
             workThread.startGracefulShutdown();
     }
 
     @Override
     public boolean awaitStop(long timeoutMs) {
+        boolean success = true;
         if (workThread != null) {
             try {
-                boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
+                success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
                 if (!success)
                     workThread.forceShutdown();
-                return success;
             } catch (InterruptedException e) {
-                return false;
+                success = false;
             }
         }
-        return true;
+        commitOffsets();
+        return success;
     }
 
     @Override