You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/04/20 21:34:48 UTC

kafka git commit: KAFKA-5073: Kafka Streams stuck rebalancing after exception thrown in rebalance listener

Repository: kafka
Updated Branches:
  refs/heads/trunk 3c471d25b -> 492306257


KAFKA-5073: Kafka Streams stuck rebalancing after exception thrown in rebalance listener

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Ismael Juma, Eno Thereska, Guozhang Wang

Closes #2856 from mjsax/kafka-5073


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/49230625
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/49230625
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/49230625

Branch: refs/heads/trunk
Commit: 49230625736d30cb4d0467ac0a103b23de5c266b
Parents: 3c471d2
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Thu Apr 20 14:34:45 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Apr 20 14:34:45 2017 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/processor/internals/StreamThread.java   | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/49230625/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index cd78a85..7af6bad 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -570,6 +570,10 @@ public class StreamThread extends Thread {
             resetInvalidOffsets(e);
         }
 
+        if (rebalanceException != null) {
+            throw new StreamsException(logPrefix + " Failed to rebalance.", rebalanceException);
+        }
+
         return records;
     }