You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2017/07/17 14:31:57 UTC
flink git commit: [FLINK-7101][table] add condition of
!stateCleaningEnabled is avoided non-grouped window state to be cleaned up
too early
Repository: flink
Updated Branches:
refs/heads/master 527e7499c -> 1125122a7
[FLINK-7101][table] add condition of !stateCleaningEnabled is avoided non-grouped window state to be cleaned up too early
This closes #4348.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1125122a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1125122a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1125122a
Branch: refs/heads/master
Commit: 1125122a75d25c3d3aa55d7f51d84ed25ee69c56
Parents: 527e749
Author: sunjincheng121 <su...@gmail.com>
Authored: Sat Jul 15 19:43:30 2017 +0800
Committer: Jincheng Sun <ji...@apache.org>
Committed: Mon Jul 17 22:28:34 2017 +0800
----------------------------------------------------------------------
.../table/runtime/aggregate/GroupAggProcessFunction.scala | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/1125122a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 57ea86e..690a7c3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -131,8 +131,11 @@ class GroupAggProcessFunction(
// if this was not the first row and we have to emit retractions
if (generateRetraction && !firstRow) {
- if (prevRow.row.equals(newRow.row)) {
- // newRow is the same as before. Do not emit retraction and acc messages
+ if (prevRow.row.equals(newRow.row) && !stateCleaningEnabled) {
+ // newRow is the same as before and state cleaning is not enabled.
+ // We do not emit retraction and acc message.
+ // If state cleaning is enabled, we have to emit messages to prevent too early
+ // state eviction of downstream operators.
return
} else {
// retract previous result