You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2017/04/04 22:12:38 UTC

kafka git commit: MINOR: don't throw CommitFailedException during suspendTasksAndState

Repository: kafka
Updated Branches:
  refs/heads/0.10.2 8c0005740 -> 444110eac


MINOR: don't throw CommitFailedException during suspendTasksAndState

Cherrypicked from trunk https://github.com/apache/kafka/pull/2535

Author: Eno Thereska <en...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #2804 from enothereska/CommitFailedException-0.10.2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/444110ea
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/444110ea
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/444110ea

Branch: refs/heads/0.10.2
Commit: 444110eacb88a5f6cffbdb3a773f5d50eca2aea8
Parents: 8c00057
Author: Eno Thereska <en...@confluent.io>
Authored: Tue Apr 4 15:12:25 2017 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Apr 4 15:12:25 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/processor/internals/StreamThread.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/444110ea/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 70c1392..d053431 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -478,7 +478,9 @@ public class StreamThread extends Thread {
         firstException.compareAndSet(null, flushAllState());
         // only commit after all state has been flushed and there hasn't been an exception
         if (firstException.get() == null) {
-            firstException.set(commitOffsets());
+            // TODO: currently commit failures will not be thrown to users
+            // while suspending tasks; this need to be re-visit after KIP-98
+            commitOffsets();
         }
         // remove the changelog partitions from restore consumer
         firstException.compareAndSet(null, unAssignChangeLogPartitions());