You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/09 19:02:25 UTC

kafka git commit: HOTFIX: Fix NPE after standby task reassignment

Repository: kafka
Updated Branches:
  refs/heads/trunk 9f5a1f876 -> 6352a30f4


HOTFIX: Fix NPE after standby task reassignment

Buffered records of change logs must be cleared upon reassignment of standby tasks.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #889 from ymatsuda/hotfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6352a30f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6352a30f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6352a30f

Branch: refs/heads/trunk
Commit: 6352a30f46f2da11a8dc3e58912d0a2db8284c35
Parents: 9f5a1f8
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Feb 9 10:02:20 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 9 10:02:20 2016 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/streams/processor/internals/StreamThread.java  | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6352a30f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 6a8eabc..d51974a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -682,6 +682,7 @@ public class StreamThread extends Thread {
 
         standbyTasks.clear();
         standbyTasksByPartition.clear();
+        standbyRecords.clear();
     }
 
     private void ensureCopartitioning(Collection<Set<String>> copartitionGroups) {