You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/10 01:17:59 UTC

kafka git commit: KAFKA-4486: Don't commit offsets on exception

Repository: kafka
Updated Branches:
  refs/heads/trunk 9bed8fbcf -> 7c7becd4c


KAFKA-4486: Don't commit offsets on exception

Author: Eno Thereska <en...@gmail.com>

Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang

Closes #2225 from enothereska/KAFKA-4486-exception-commit


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c7becd4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c7becd4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c7becd4

Branch: refs/heads/trunk
Commit: 7c7becd4cb88b372d199b05661f8efb1e1d022e6
Parents: 9bed8fb
Author: Eno Thereska <en...@gmail.com>
Authored: Fri Dec 9 17:17:55 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Dec 9 17:17:55 2016 -0800

----------------------------------------------------------------------
 .../kafka/streams/processor/internals/StreamThread.java  | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7c7becd4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 96e9963..a2cac71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -198,7 +198,7 @@ public class StreamThread extends Thread {
     final StateDirectory stateDirectory;
 
     private StreamPartitionAssignor partitionAssignor = null;
-
+    private boolean cleanRun = false;
     private long timerStartedMs;
     private long lastCleanMs;
     private long lastCommitMs;
@@ -343,6 +343,7 @@ public class StreamThread extends Thread {
 
         try {
             runLoop();
+            cleanRun = true;
         } catch (KafkaException e) {
             // just re-throw the exception as it should be logged already
             throw e;
@@ -368,6 +369,7 @@ public class StreamThread extends Thread {
         return Collections.unmodifiableMap(activeTasks);
     }
 
+
     private void shutdown() {
         log.info("{} Shutting down", logPrefix);
         shutdownTasksAndState(false);
@@ -414,8 +416,11 @@ public class StreamThread extends Thread {
         log.debug("{} shutdownTasksAndState: shutting down all active tasks [{}] and standby tasks [{}]", logPrefix,
             activeTasks.keySet(), standbyTasks.keySet());
 
-        // Commit first as there may be cached records that have not been flushed yet.
-        commitOffsets(rethrowExceptions);
+        // only commit under clean exit
+        if (cleanRun) {
+            // Commit first as there may be cached records that have not been flushed yet.
+            commitOffsets(rethrowExceptions);
+        }
         // Close all processors in topology order
         closeAllTasks();
         // flush state