You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/10/14 00:26:52 UTC
kafka git commit: MINOR: flush record collector after local state
flush
Repository: kafka
Updated Branches:
refs/heads/trunk 123d27e4d -> b1ce9494e
MINOR: flush record collector after local state flush
guozhangwang
Fix the order of flushing. Undoing the change I did sometime ago.
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #304 from ymatsuda/flush_order
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b1ce9494
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b1ce9494
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b1ce9494
Branch: refs/heads/trunk
Commit: b1ce9494e3a964613d3d9534471df79593514c77
Parents: 123d27e
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Oct 13 15:31:27 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Oct 13 15:31:27 2015 -0700
----------------------------------------------------------------------
.../apache/kafka/streams/processor/internals/StreamTask.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b1ce9494/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6afa427..a94202f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -245,12 +245,12 @@ public class StreamTask implements Punctuator {
* Commit the current task state
*/
public void commit() {
- // 1) flush produced records in the downstream and change logs of local states
- recordCollector.flush();
-
- // 2) flush local state
+ // 1) flush local state
stateMgr.flush();
+ // 2) flush produced records in the downstream and change logs of local states
+ recordCollector.flush();
+
// 3) commit consumed offsets if it is dirty already
if (commitOffsetNeeded) {
Map<TopicPartition, OffsetAndMetadata> consumedOffsetsAndMetadata = new HashMap<>(consumedOffsets.size());