You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/23 06:39:29 UTC

kafka git commit: MINOR: catch a commit failure due to rebalance in StreamThread

Repository: kafka
Updated Branches:
  refs/heads/trunk 982ab09a7 -> 68af16ac1


MINOR: catch a commit failure due to rebalance in StreamThread

StreamThread should keep going after a commit was failed due to a group rebalance.
Currently the thread just dies.
guozhangwang

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #933 from ymatsuda/catch_commit_failure


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/68af16ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/68af16ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/68af16ac

Branch: refs/heads/trunk
Commit: 68af16ac15e5675daebb710ed8f15f780dc43abd
Parents: 982ab09
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Mon Feb 22 21:39:26 2016 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Mon Feb 22 21:39:26 2016 -0800

----------------------------------------------------------------------
 .../apache/kafka/streams/processor/internals/StreamThread.java  | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/68af16ac/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 70e24d0..10e458a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -487,7 +488,11 @@ public class StreamThread extends Thread {
     private void commitOne(AbstractTask task, long now) {
         try {
             task.commit();
+        } catch (CommitFailedException e) {
+            // commit failed. Just log it.
+            log.warn("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
         } catch (KafkaException e) {
+            // commit failed due to an unexpected exception. Log it and rethrow the exception.
             log.error("Failed to commit " + task.getClass().getSimpleName() + " #" + task.id() + " in thread [" + this.getName() + "]: ", e);
             throw e;
         }