You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/12/10 01:17:59 UTC
kafka git commit: KAFKA-4486: Don't commit offsets on exception
Repository: kafka
Updated Branches:
refs/heads/trunk 9bed8fbcf -> 7c7becd4c
KAFKA-4486: Don't commit offsets on exception
Author: Eno Thereska <en...@gmail.com>
Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang
Closes #2225 from enothereska/KAFKA-4486-exception-commit
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7c7becd4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7c7becd4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7c7becd4
Branch: refs/heads/trunk
Commit: 7c7becd4cb88b372d199b05661f8efb1e1d022e6
Parents: 9bed8fb
Author: Eno Thereska <en...@gmail.com>
Authored: Fri Dec 9 17:17:55 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Fri Dec 9 17:17:55 2016 -0800
----------------------------------------------------------------------
.../kafka/streams/processor/internals/StreamThread.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7c7becd4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 96e9963..a2cac71 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -198,7 +198,7 @@ public class StreamThread extends Thread {
final StateDirectory stateDirectory;
private StreamPartitionAssignor partitionAssignor = null;
-
+ private boolean cleanRun = false;
private long timerStartedMs;
private long lastCleanMs;
private long lastCommitMs;
@@ -343,6 +343,7 @@ public class StreamThread extends Thread {
try {
runLoop();
+ cleanRun = true;
} catch (KafkaException e) {
// just re-throw the exception as it should be logged already
throw e;
@@ -368,6 +369,7 @@ public class StreamThread extends Thread {
return Collections.unmodifiableMap(activeTasks);
}
+
private void shutdown() {
log.info("{} Shutting down", logPrefix);
shutdownTasksAndState(false);
@@ -414,8 +416,11 @@ public class StreamThread extends Thread {
log.debug("{} shutdownTasksAndState: shutting down all active tasks [{}] and standby tasks [{}]", logPrefix,
activeTasks.keySet(), standbyTasks.keySet());
- // Commit first as there may be cached records that have not been flushed yet.
- commitOffsets(rethrowExceptions);
+ // only commit under clean exit
+ if (cleanRun) {
+ // Commit first as there may be cached records that have not been flushed yet.
+ commitOffsets(rethrowExceptions);
+ }
// Close all processors in topology order
closeAllTasks();
// flush state