You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ji...@apache.org on 2017/07/17 14:31:57 UTC

flink git commit: [FLINK-7101][table] add condition of !stateCleaningEnabled is avoided non-grouped window state to be cleaned up too early

Repository: flink
Updated Branches:
  refs/heads/master 527e7499c -> 1125122a7


[FLINK-7101][table] add condition of !stateCleaningEnabled is avoided non-grouped window state to be cleaned up too early

This closes #4348.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1125122a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1125122a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1125122a

Branch: refs/heads/master
Commit: 1125122a75d25c3d3aa55d7f51d84ed25ee69c56
Parents: 527e749
Author: sunjincheng121 <su...@gmail.com>
Authored: Sat Jul 15 19:43:30 2017 +0800
Committer: Jincheng Sun <ji...@apache.org>
Committed: Mon Jul 17 22:28:34 2017 +0800

----------------------------------------------------------------------
 .../table/runtime/aggregate/GroupAggProcessFunction.scala     | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1125122a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index 57ea86e..690a7c3 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -131,8 +131,11 @@ class GroupAggProcessFunction(
 
       // if this was not the first row and we have to emit retractions
       if (generateRetraction && !firstRow) {
-        if (prevRow.row.equals(newRow.row)) {
-          // newRow is the same as before. Do not emit retraction and acc messages
+        if (prevRow.row.equals(newRow.row) && !stateCleaningEnabled) {
+          // newRow is the same as before and state cleaning is not enabled.
+          // We do not emit retraction and acc message.
+          // If state cleaning is enabled, we have to emit messages to prevent too early
+          // state eviction of downstream operators.
           return
         } else {
           // retract previous result