You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/14 00:26:52 UTC

kafka git commit: MINOR: flush record collector after local state flush

Repository: kafka
Updated Branches:
  refs/heads/trunk 123d27e4d -> b1ce9494e


MINOR: flush record collector after local state flush

guozhangwang
Fix the order of flushing. Undoing the change I did sometime ago.

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #304 from ymatsuda/flush_order


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1ce9494
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1ce9494
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1ce9494

Branch: refs/heads/trunk
Commit: b1ce9494e3a964613d3d9534471df79593514c77
Parents: 123d27e
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Oct 13 15:31:27 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 13 15:31:27 2015 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/processor/internals/StreamTask.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b1ce9494/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6afa427..a94202f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -245,12 +245,12 @@ public class StreamTask implements Punctuator {
      * Commit the current task state
      */
     public void commit() {
-        // 1) flush produced records in the downstream and change logs of local states
-        recordCollector.flush();
-
-        // 2) flush local state
+        // 1) flush local state
         stateMgr.flush();
 
+        // 2) flush produced records in the downstream and change logs of local states
+        recordCollector.flush();
+
         // 3) commit consumed offsets if it is dirty already
         if (commitOffsetNeeded) {
             Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());