You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/10/10 21:09:58 UTC
[2/5] flink git commit: [FLINK-7776] [table] Prevent emission of
identical update records in group aggregation.
[FLINK-7776] [table] Prevent emission of identical update records in group aggregation.
This closes #4785.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4047be49
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4047be49
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4047be49
Branch: refs/heads/master
Commit: 4047be49e10cacc5e4ce932a0b8433afffa82a58
Parents: 1ea7f49
Author: Xpray <le...@gmail.com>
Authored: Mon Oct 9 18:19:01 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Oct 10 23:09:07 2017 +0200
----------------------------------------------------------------------
.../aggregate/GroupAggProcessFunction.scala | 10 ++++----
.../runtime/stream/table/AggregateITCase.scala | 25 ++++++++++++++++++++
2 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4047be49/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
index df59460..91c379f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
@@ -129,17 +129,19 @@ class GroupAggProcessFunction(
state.update(accumulators)
cntState.update(inputCnt)
- // if this was not the first row and we have to emit retractions
- if (generateRetraction && !firstRow) {
+ // if this was not the first row
+ if (!firstRow) {
if (prevRow.row.equals(newRow.row) && !stateCleaningEnabled) {
// newRow is the same as before and state cleaning is not enabled.
- // We do not emit retraction and acc message.
+ // We emit nothing
// If state cleaning is enabled, we have to emit messages to prevent too early
// state eviction of downstream operators.
return
} else {
// retract previous result
- out.collect(prevRow)
+ if (generateRetraction) {
+ out.collect(prevRow)
+ }
}
}
// emit the new result
http://git-wip-us.apache.org/repos/asf/flink/blob/4047be49/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
index eb3d37f..e67c784 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
@@ -193,4 +193,29 @@ class AggregateITCase extends StreamingWithStateTestBase {
// verify agg close is called
assert(JavaUserDefinedAggFunctions.isCloseCalled)
}
+
+ @Test
+ def testRemoveDuplicateRecordsWithUpsertSink(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val data = new mutable.MutableList[(Int, Long, String)]
+ data.+=((1, 1L, "A"))
+ data.+=((2, 2L, "B"))
+ data.+=((3, 2L, "B"))
+ data.+=((4, 3L, "C"))
+ data.+=((5, 3L, "C"))
+
+ val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('c)
+ .select('c, 'b.max)
+
+ t.writeToSink(new TestUpsertSink(Array("c"), false))
+ env.execute()
+
+ val expected = List("(true,A,1)", "(true,B,2)", "(true,C,3)")
+ assertEquals(expected.sorted, RowCollector.getAndClearValues.map(_.toString).sorted)
+ }
}