You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/10/10 21:09:58 UTC

[2/5] flink git commit: [FLINK-7776] [table] Prevent emission of identical update records in group aggregation.

[FLINK-7776] [table] Prevent emission of identical update records in group aggregation.

This closes #4785.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4047be49
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4047be49
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4047be49

Branch: refs/heads/master
Commit: 4047be49e10cacc5e4ce932a0b8433afffa82a58
Parents: 1ea7f49
Author: Xpray <le...@gmail.com>
Authored: Mon Oct 9 18:19:01 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Oct 10 23:09:07 2017 +0200

----------------------------------------------------------------------
 .../aggregate/GroupAggProcessFunction.scala     | 10 ++++----
 .../runtime/stream/table/AggregateITCase.scala  | 25 ++++++++++++++++++++
 2 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4047be49/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index df59460..91c379f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -129,17 +129,19 @@ class GroupAggProcessFunction(
       state.update(accumulators)
       cntState.update(inputCnt)
 
-      // if this was not the first row and we have to emit retractions
-      if (generateRetraction && !firstRow) {
+      // if this was not the first row
+      if (!firstRow) {
         if (prevRow.row.equals(newRow.row) && !stateCleaningEnabled) {
           // newRow is the same as before and state cleaning is not enabled.
-          // We do not emit retraction and acc message.
+          // We emit nothing
           // If state cleaning is enabled, we have to emit messages to prevent too early
           // state eviction of downstream operators.
           return
         } else {
           // retract previous result
-          out.collect(prevRow)
+          if (generateRetraction) {
+            out.collect(prevRow)
+          }
         }
       }
       // emit the new result

http://git-wip-us.apache.org/repos/asf/flink/blob/4047be49/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
index eb3d37f..e67c784 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
@@ -193,4 +193,29 @@ class AggregateITCase extends StreamingWithStateTestBase {
     // verify agg close is called
     assert(JavaUserDefinedAggFunctions.isCloseCalled)
   }
+
+  @Test
+  def testRemoveDuplicateRecordsWithUpsertSink(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStateBackend(getStateBackend)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "A"))
+    data.+=((2, 2L, "B"))
+    data.+=((3, 2L, "B"))
+    data.+=((4, 3L, "C"))
+    data.+=((5, 3L, "C"))
+
+    val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+            .groupBy('c)
+            .select('c, 'b.max)
+
+    t.writeToSink(new TestUpsertSink(Array("c"), false))
+    env.execute()
+
+    val expected = List("(true,A,1)", "(true,B,2)", "(true,C,3)")
+    assertEquals(expected.sorted, RowCollector.getAndClearValues.map(_.toString).sorted)
+  }
 }