You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/02/12 14:55:22 UTC
[flink] branch release-1.12 updated:
[FLINK-21225][table-planner-blink] Support OVER window distinct aggregates
in Table API
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.12 by this push:
new 1345c0f [FLINK-21225][table-planner-blink] Support OVER window distinct aggregates in Table API
1345c0f is described below
commit 1345c0f9a606a6e5ccffda59bb28a6ccfe054263
Author: Jane <55...@users.noreply.github.com>
AuthorDate: Fri Feb 12 22:54:59 2021 +0800
[FLINK-21225][table-planner-blink] Support OVER window distinct aggregates in Table API
This closes #14917
---
.../expressions/converter/OverConvertRule.java | 16 +-
.../planner/plan/stream/table/OverWindowTest.xml | 68 +++++++
.../planner/plan/stream/table/OverWindowTest.scala | 49 +++++
.../runtime/stream/table/OverWindowITCase.scala | 220 +++++++++++++++++++++
4 files changed, 351 insertions(+), 2 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java
index 220f19f..62df20d 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java
@@ -70,6 +70,9 @@ public class OverConvertRule implements CallExpressionConvertRule {
if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.OVER) {
FlinkTypeFactory typeFactory = context.getTypeFactory();
Expression agg = children.get(0);
+ FunctionDefinition def = ((CallExpression) agg).getFunctionDefinition();
+ boolean isDistinct = BuiltInFunctionDefinitions.DISTINCT == def;
+
SqlAggFunction aggFunc = agg.accept(new SqlAggFunctionVisitor(context.getRelBuilder()));
RelDataType aggResultType =
typeFactory.createFieldTypeFromLogicalType(
@@ -78,7 +81,16 @@ public class OverConvertRule implements CallExpressionConvertRule {
// assemble exprs by agg children
List<RexNode> aggExprs =
- agg.getChildren().stream().map(context::toRexNode).collect(Collectors.toList());
+ agg.getChildren().stream()
+ .map(
+ child -> {
+ if (isDistinct) {
+ return context.toRexNode(child.getChildren().get(0));
+ } else {
+ return context.toRexNode(child);
+ }
+ })
+ .collect(Collectors.toList());
// assemble order by key
Expression orderKeyExpr = children.get(1);
@@ -123,7 +135,7 @@ public class OverConvertRule implements CallExpressionConvertRule {
isPhysical,
true,
false,
- false));
+ isDistinct));
}
return Optional.empty();
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml
index 2f3f38d..1df07c7 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml
@@ -152,6 +152,40 @@ Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2])
]]>
</Resource>
</TestCase>
+ <TestCase name="testRowTimeBoundedDistinctWithPartitionedRangeOver">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE 7200000 PRECEDING), _UTF-16LE'_c1')], _c2=[AS(SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE 7200000 PRECEDING), _UTF-16LE'_c2')], _c3=[AS(AVG(DISTINCT AS(CAST($0):FLOAT, _UTF-16LE'a')) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE 7200000 PRECEDING), _UTF-16LE'_c3')])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2])
+ +- Exchange(distribution=[hash[c]])
+ +- Calc(select=[a, c, rowtime, CAST(a) AS $3])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRowTimeBoundedDistinctWithPartitionedRowsOver">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS 2 PRECEDING), _UTF-16LE'_c1')], _c2=[AS(SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS 2 PRECEDING), _UTF-16LE'_c2')], _c3=[AS(AVG(DISTINCT AS(CAST($0):FLOAT, _UTF-16LE'a')) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS 2 PRECEDING), _UTF-16LE'_c3')])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN 2 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2])
+ +- Exchange(distribution=[hash[c]])
+ +- Calc(select=[a, c, rowtime, CAST(a) AS $3])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testRowTimeBoundedNonPartitionedRangeOver">
<Resource name="planBefore">
<![CDATA[
@@ -220,6 +254,40 @@ Calc(select=[c, w0$o0 AS _c1, w0$o1 AS wAvg])
]]>
</Resource>
</TestCase>
+ <TestCase name="testRowTimeUnboundedDistinctWithPartitionedRangeOver">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), _UTF-16LE'_c1')], _c2=[AS(SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), _UTF-16LE'_c2')], _c3=[AS(AVG(DISTINCT AS(CAST($0):FLOAT, _UTF-16LE'a')) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), _UTF-16LE'_c3')])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2])
+ +- Exchange(distribution=[hash[c]])
+ +- Calc(select=[a, c, rowtime, CAST(a) AS $3])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testRowTimeUnboundedDistinctWithPartitionedRowsOver">
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), _UTF-16LE'_c1')], _c2=[AS(SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), _UTF-16LE'_c2')], _c3=[AS(AVG(DISTINCT AS(CAST($0):FLOAT, _UTF-16LE'a')) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), _UTF-16LE'_c3')])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2])
+ +- Exchange(distribution=[hash[c]])
+ +- Calc(select=[a, c, rowtime, CAST(a) AS $3])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testRowTimeUnboundedNonPartitionedRangeOver">
<Resource name="planBefore">
<![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.scala
index cec69be..4466091 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.scala
@@ -189,6 +189,55 @@ class OverWindowTest extends TableTestBase {
}
@Test
+ def testRowTimeBoundedDistinctWithPartitionedRangeOver(): Unit = {
+ val result = table
+ .window(Over partitionBy 'c orderBy 'rowtime preceding 2.hours following CURRENT_RANGE as 'w)
+ .select('c,
+ 'a.count.distinct over 'w,
+ 'a.sum.distinct over 'w,
+ ('a.cast(DataTypes.FLOAT) as 'a).avg.distinct over 'w)
+
+ streamUtil.verifyPlan(result)
+ }
+
+ @Test
+ def testRowTimeUnboundedDistinctWithPartitionedRangeOver(): Unit = {
+ val result = table
+ .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
+ .select('c,
+ 'a.count.distinct over 'w,
+ 'a.sum.distinct over 'w,
+ ('a.cast(DataTypes.FLOAT) as 'a).avg.distinct over 'w)
+
+ streamUtil.verifyPlan(result)
+ }
+
+ @Test
+ def testRowTimeBoundedDistinctWithPartitionedRowsOver(): Unit = {
+ val result = table
+ .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
+ .select('c,
+ 'a.count.distinct over 'w,
+ 'a.sum.distinct over 'w,
+ ('a.cast(DataTypes.FLOAT) as 'a).avg.distinct over 'w)
+
+ streamUtil.verifyPlan(result)
+ }
+
+ @Test
+ def testRowTimeUnboundedDistinctWithPartitionedRowsOver(): Unit = {
+ val result = table
+ .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW following
+ CURRENT_ROW as 'w)
+ .select('c,
+ 'a.count.distinct over 'w,
+ 'a.sum.distinct over 'w,
+ ('a.cast(DataTypes.FLOAT) as 'a).avg.distinct over 'w)
+
+ streamUtil.verifyPlan(result)
+ }
+
+ @Test
def testRowTimeUnboundedPartitionedRowsOver() = {
val weightedAvg = new WeightedAvgWithRetract
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala
index 8ed3692..2104f75 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala
@@ -194,6 +194,226 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
}
@Test
+ def testRowTimeBoundedDistinctPartitionedRangeOver(): Unit = {
+ val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000001L, (1, 1L, "Hello")),
+ Left(14000002L, (1, 2L, "Hello")),
+ Left(14000002L, (1, 3L, "Hello world")),
+ Left(14000003L, (2, 2L, "Hello world")),
+ Left(14000003L, (2, 3L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 4L, "Hello world")),
+ Left(14000022L, (1, 5L, "Hello world")),
+ Left(14000022L, (1, 6L, "Hello world")),
+ Left(14000022L, (1, 7L, "Hello world")),
+ Left(14000023L, (2, 4L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val source = failingDataSource(data)
+ val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ val windowedTable = table
+ .window(Over partitionBy 'c orderBy 'rowtime
+ preceding 1.seconds following CURRENT_RANGE as 'w)
+ .select(
+ 'c,
+ 'b.count.distinct over 'w,
+ 'b.sum.distinct over 'w,
+ ('b.cast(DataTypes.FLOAT) as 'b).avg.distinct over 'w)
+
+ val sink = new TestingAppendSink
+ windowedTable.toAppendStream[Row].addSink(sink)
+ env.execute()
+ val expected = Seq(
+ "Hello,1,1,1.0",
+ "Hello,1,1,1.0",
+ "Hello,2,3,1.5",
+ "Hello world,1,3,3.0",
+ "Hello world,2,5,2.5",
+ "Hello world,2,5,2.5",
+ "Hi,1,1,1.0",
+ "Hello world,3,9,3.0",
+ "Hello world,6,27,4.5",
+ "Hello world,6,27,4.5",
+ "Hello world,6,27,4.5",
+ "Hello world,6,27,4.5",
+ "Hello world,6,27,4.5"
+ )
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedDistinctPartitionedRangeOver(): Unit = {
+ val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000001L, (1, 1L, "Hello")),
+ Left(14000002L, (1, 2L, "Hello")),
+ Left(14000002L, (1, 3L, "Hello world")),
+ Left(14000003L, (2, 2L, "Hello world")),
+ Left(14000003L, (2, 3L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 4L, "Hello world")),
+ Left(14000022L, (1, 5L, "Hello world")),
+ Left(14000022L, (1, 6L, "Hello world")),
+ Left(14000022L, (1, 7L, "Hello world")),
+ Left(14000023L, (2, 4L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val source = failingDataSource(data)
+ val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ val windowedTable = table
+ .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
+ .select(
+ 'c,
+ 'b.count.distinct over 'w,
+ 'b.sum.distinct over 'w,
+ ('b.cast(DataTypes.FLOAT) as 'b).avg.distinct over 'w
+ )
+
+ val sink = new TestingAppendSink
+ windowedTable.toAppendStream[Row].addSink(sink)
+ env.execute()
+ val expected = Seq(
+ "Hello,1,1,1.0",
+ "Hello,1,1,1.0",
+ "Hello,2,3,1.5",
+ "Hello world,1,3,3.0",
+ "Hello world,2,5,2.5",
+ "Hello world,2,5,2.5",
+ "Hi,1,1,1.0",
+ "Hello world,3,9,3.0",
+ "Hello world,6,27,4.5",
+ "Hello world,6,27,4.5",
+ "Hello world,6,27,4.5",
+ "Hello world,6,27,4.5",
+ "Hello world,6,27,4.5"
+ )
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedDistinctPartitionedRowsOver(): Unit = {
+ val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000001L, (1, 1L, "Hello")),
+ Left(14000002L, (1, 2L, "Hello")),
+ Left(14000002L, (1, 3L, "Hello world")),
+ Left(14000003L, (2, 2L, "Hello world")),
+ Left(14000003L, (2, 3L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 4L, "Hello world")),
+ Left(14000022L, (1, 5L, "Hello world")),
+ Left(14000022L, (1, 6L, "Hello world")),
+ Left(14000022L, (1, 7L, "Hello world")),
+ Left(14000023L, (2, 4L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val source = failingDataSource(data)
+ val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ val windowedTable = table
+ .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
+ .select(
+ 'c,
+ 'b.count.distinct over 'w,
+ 'b.sum.distinct over 'w,
+ ('b.cast(DataTypes.FLOAT) as 'b).avg.distinct over 'w)
+
+ val sink = new TestingAppendSink
+ windowedTable.toAppendStream[Row].addSink(sink)
+ env.execute()
+ val expected = Seq(
+ "Hello,1,1,1.0",
+ "Hello,1,1,1.0",
+ "Hello,2,3,1.5",
+ "Hello world,1,3,3.0",
+ "Hello world,2,5,2.5",
+ "Hello world,2,5,2.5",
+ "Hi,1,1,1.0",
+ "Hello world,3,9,3.0",
+ "Hello world,3,12,4.0",
+ "Hello world,3,15,5.0",
+ "Hello world,3,16,5.3333335",
+ "Hello world,3,17,5.6666665",
+ "Hello world,3,18,6.0"
+ )
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedDistinctPartitionedRowsOver(): Unit = {
+ val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000001L, (1, 1L, "Hello")),
+ Left(14000002L, (1, 2L, "Hello")),
+ Left(14000002L, (1, 3L, "Hello world")),
+ Left(14000003L, (2, 2L, "Hello world")),
+ Left(14000003L, (2, 3L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 4L, "Hello world")),
+ Left(14000022L, (1, 5L, "Hello world")),
+ Left(14000022L, (1, 6L, "Hello world")),
+ Left(14000022L, (1, 7L, "Hello world")),
+ Left(14000023L, (2, 4L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val source = failingDataSource(data)
+ val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+ .setParallelism(source.parallelism)
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ val windowedTable = table
+ .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW following
+ CURRENT_ROW as 'w)
+ .select(
+ 'c,
+ 'b.count.distinct over 'w,
+ 'b.sum.distinct over 'w,
+ ('b.cast(DataTypes.FLOAT) as 'b).avg.distinct over 'w)
+
+ val sink = new TestingAppendSink
+ windowedTable.toAppendStream[Row].addSink(sink)
+ env.execute()
+ val expected = Seq(
+ "Hello,1,1,1.0",
+ "Hello,1,1,1.0",
+ "Hello,2,3,1.5",
+ "Hello world,1,3,3.0",
+ "Hello world,2,5,2.5",
+ "Hello world,2,5,2.5",
+ "Hi,1,1,1.0",
+ "Hello world,3,9,3.0",
+ "Hello world,4,14,3.5",
+ "Hello world,5,20,4.0",
+ "Hello world,6,27,4.5",
+ "Hello world,6,27,4.5",
+ "Hello world,6,27,4.5"
+ )
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+
+ @Test
def testProcTimeBoundedPartitionedRowsOver(): Unit = {
val data = List(