You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2021/02/12 14:55:22 UTC

[flink] branch release-1.12 updated: [FLINK-21225][table-planner-blink] Support OVER window distinct aggregates in Table API

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.12 by this push:
     new 1345c0f  [FLINK-21225][table-planner-blink] Support OVER window distinct aggregates in Table API
1345c0f is described below

commit 1345c0f9a606a6e5ccffda59bb28a6ccfe054263
Author: Jane <55...@users.noreply.github.com>
AuthorDate: Fri Feb 12 22:54:59 2021 +0800

    [FLINK-21225][table-planner-blink] Support OVER window distinct aggregates in Table API
    
    This closes #14917
---
 .../expressions/converter/OverConvertRule.java     |  16 +-
 .../planner/plan/stream/table/OverWindowTest.xml   |  68 +++++++
 .../planner/plan/stream/table/OverWindowTest.scala |  49 +++++
 .../runtime/stream/table/OverWindowITCase.scala    | 220 +++++++++++++++++++++
 4 files changed, 351 insertions(+), 2 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java
index 220f19f..62df20d 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java
@@ -70,6 +70,9 @@ public class OverConvertRule implements CallExpressionConvertRule {
         if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.OVER) {
             FlinkTypeFactory typeFactory = context.getTypeFactory();
             Expression agg = children.get(0);
+            FunctionDefinition def = ((CallExpression) agg).getFunctionDefinition();
+            boolean isDistinct = BuiltInFunctionDefinitions.DISTINCT == def;
+
             SqlAggFunction aggFunc = agg.accept(new SqlAggFunctionVisitor(context.getRelBuilder()));
             RelDataType aggResultType =
                     typeFactory.createFieldTypeFromLogicalType(
@@ -78,7 +81,16 @@ public class OverConvertRule implements CallExpressionConvertRule {
 
             // assemble exprs by agg children
             List<RexNode> aggExprs =
-                    agg.getChildren().stream().map(context::toRexNode).collect(Collectors.toList());
+                    agg.getChildren().stream()
+                            .map(
+                                    child -> {
+                                        if (isDistinct) {
+                                            return context.toRexNode(child.getChildren().get(0));
+                                        } else {
+                                            return context.toRexNode(child);
+                                        }
+                                    })
+                            .collect(Collectors.toList());
 
             // assemble order by key
             Expression orderKeyExpr = children.get(1);
@@ -123,7 +135,7 @@ public class OverConvertRule implements CallExpressionConvertRule {
                                     isPhysical,
                                     true,
                                     false,
-                                    false));
+                                    isDistinct));
         }
         return Optional.empty();
     }
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml
index 2f3f38d..1df07c7 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.xml
@@ -152,6 +152,40 @@ Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testRowTimeBoundedDistinctWithPartitionedRangeOver">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE 7200000 PRECEDING), _UTF-16LE'_c1')], _c2=[AS(SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE 7200000 PRECEDING), _UTF-16LE'_c2')], _c3=[AS(AVG(DISTINCT AS(CAST($0):FLOAT, _UTF-16LE'a')) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST RANGE 7200000 PRECEDING), _UTF-16LE'_c3')])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN 7200000 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2])
+   +- Exchange(distribution=[hash[c]])
+      +- Calc(select=[a, c, rowtime, CAST(a) AS $3])
+         +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRowTimeBoundedDistinctWithPartitionedRowsOver">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS 2 PRECEDING), _UTF-16LE'_c1')], _c2=[AS(SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS 2 PRECEDING), _UTF-16LE'_c2')], _c3=[AS(AVG(DISTINCT AS(CAST($0):FLOAT, _UTF-16LE'a')) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS 2 PRECEDING), _UTF-16LE'_c3')])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN 2 PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2])
+   +- Exchange(distribution=[hash[c]])
+      +- Calc(select=[a, c, rowtime, CAST(a) AS $3])
+         +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testRowTimeBoundedNonPartitionedRangeOver">
     <Resource name="planBefore">
       <![CDATA[
@@ -220,6 +254,40 @@ Calc(select=[c, w0$o0 AS _c1, w0$o1 AS wAvg])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testRowTimeUnboundedDistinctWithPartitionedRangeOver">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), _UTF-16LE'_c1')], _c2=[AS(SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), _UTF-16LE'_c2')], _c3=[AS(AVG(DISTINCT AS(CAST($0):FLOAT, _UTF-16LE'a')) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST), _UTF-16LE'_c3')])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2])
+   +- Exchange(distribution=[hash[c]])
+      +- Calc(select=[a, c, rowtime, CAST(a) AS $3])
+         +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testRowTimeUnboundedDistinctWithPartitionedRowsOver">
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(c=[$2], _c1=[AS(COUNT(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), _UTF-16LE'_c1')], _c2=[AS(SUM(DISTINCT $0) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), _UTF-16LE'_c2')], _c3=[AS(AVG(DISTINCT AS(CAST($0):FLOAT, _UTF-16LE'a')) OVER (PARTITION BY $2 ORDER BY $4 NULLS FIRST ROWS UNBOUNDED PRECEDING), _UTF-16LE'_c3')])
++- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[c, w0$o0 AS _c1, w0$o1 AS _c2, w0$o2 AS _c3])
++- OverAggregate(partitionBy=[c], orderBy=[rowtime ASC], window=[ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], select=[a, c, rowtime, $3, COUNT(DISTINCT a) AS w0$o0, SUM(DISTINCT a) AS w0$o1, AVG(DISTINCT $3) AS w0$o2])
+   +- Exchange(distribution=[hash[c]])
+      +- Calc(select=[a, c, rowtime, CAST(a) AS $3])
+         +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testRowTimeUnboundedNonPartitionedRangeOver">
     <Resource name="planBefore">
       <![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.scala
index cec69be..4466091 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/OverWindowTest.scala
@@ -189,6 +189,55 @@ class OverWindowTest extends TableTestBase {
   }
 
   @Test
+  def testRowTimeBoundedDistinctWithPartitionedRangeOver(): Unit = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.hours following CURRENT_RANGE as 'w)
+      .select('c,
+        'a.count.distinct over 'w,
+        'a.sum.distinct over 'w,
+        ('a.cast(DataTypes.FLOAT) as 'a).avg.distinct over 'w)
+
+    streamUtil.verifyPlan(result)
+  }
+
+  @Test
+  def testRowTimeUnboundedDistinctWithPartitionedRangeOver(): Unit = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
+      .select('c,
+        'a.count.distinct over 'w,
+        'a.sum.distinct over 'w,
+        ('a.cast(DataTypes.FLOAT) as 'a).avg.distinct over 'w)
+
+    streamUtil.verifyPlan(result)
+  }
+
+  @Test
+  def testRowTimeBoundedDistinctWithPartitionedRowsOver(): Unit = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
+      .select('c,
+        'a.count.distinct over 'w,
+        'a.sum.distinct over 'w,
+        ('a.cast(DataTypes.FLOAT) as 'a).avg.distinct over 'w)
+
+    streamUtil.verifyPlan(result)
+  }
+
+  @Test
+  def testRowTimeUnboundedDistinctWithPartitionedRowsOver(): Unit = {
+    val result = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW following
+         CURRENT_ROW as 'w)
+      .select('c,
+        'a.count.distinct over 'w,
+        'a.sum.distinct over 'w,
+        ('a.cast(DataTypes.FLOAT) as 'a).avg.distinct over 'w)
+
+    streamUtil.verifyPlan(result)
+  }
+
+  @Test
   def testRowTimeUnboundedPartitionedRowsOver() = {
     val weightedAvg = new WeightedAvgWithRetract
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala
index 8ed3692..2104f75 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/OverWindowITCase.scala
@@ -194,6 +194,226 @@ class OverWindowITCase(mode: StateBackendMode) extends StreamingWithStateTestBas
   }
 
   @Test
+  def testRowTimeBoundedDistinctPartitionedRangeOver(): Unit = {
+    val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000001L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val source = failingDataSource(data)
+    val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Over partitionBy 'c orderBy 'rowtime
+        preceding 1.seconds following CURRENT_RANGE as 'w)
+      .select(
+        'c,
+        'b.count.distinct over 'w,
+        'b.sum.distinct over 'w,
+        ('b.cast(DataTypes.FLOAT) as 'b).avg.distinct over 'w)
+
+    val sink = new TestingAppendSink
+    windowedTable.toAppendStream[Row].addSink(sink)
+    env.execute()
+    val expected = Seq(
+      "Hello,1,1,1.0",
+      "Hello,1,1,1.0",
+      "Hello,2,3,1.5",
+      "Hello world,1,3,3.0",
+      "Hello world,2,5,2.5",
+      "Hello world,2,5,2.5",
+      "Hi,1,1,1.0",
+      "Hello world,3,9,3.0",
+      "Hello world,6,27,4.5",
+      "Hello world,6,27,4.5",
+      "Hello world,6,27,4.5",
+      "Hello world,6,27,4.5",
+      "Hello world,6,27,4.5"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedDistinctPartitionedRangeOver(): Unit = {
+    val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000001L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val source = failingDataSource(data)
+    val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
+      .select(
+        'c,
+        'b.count.distinct over 'w,
+        'b.sum.distinct over 'w,
+        ('b.cast(DataTypes.FLOAT) as 'b).avg.distinct over 'w
+      )
+
+    val sink = new TestingAppendSink
+    windowedTable.toAppendStream[Row].addSink(sink)
+    env.execute()
+    val expected = Seq(
+      "Hello,1,1,1.0",
+      "Hello,1,1,1.0",
+      "Hello,2,3,1.5",
+      "Hello world,1,3,3.0",
+      "Hello world,2,5,2.5",
+      "Hello world,2,5,2.5",
+      "Hi,1,1,1.0",
+      "Hello world,3,9,3.0",
+      "Hello world,6,27,4.5",
+      "Hello world,6,27,4.5",
+      "Hello world,6,27,4.5",
+      "Hello world,6,27,4.5",
+      "Hello world,6,27,4.5"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeBoundedDistinctPartitionedRowsOver(): Unit = {
+    val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000001L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val source = failingDataSource(data)
+    val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
+      .select(
+        'c,
+        'b.count.distinct over 'w,
+        'b.sum.distinct over 'w,
+        ('b.cast(DataTypes.FLOAT) as 'b).avg.distinct over 'w)
+
+    val sink = new TestingAppendSink
+    windowedTable.toAppendStream[Row].addSink(sink)
+    env.execute()
+    val expected = Seq(
+      "Hello,1,1,1.0",
+      "Hello,1,1,1.0",
+      "Hello,2,3,1.5",
+      "Hello world,1,3,3.0",
+      "Hello world,2,5,2.5",
+      "Hello world,2,5,2.5",
+      "Hi,1,1,1.0",
+      "Hello world,3,9,3.0",
+      "Hello world,3,12,4.0",
+      "Hello world,3,15,5.0",
+      "Hello world,3,16,5.3333335",
+      "Hello world,3,17,5.6666665",
+      "Hello world,3,18,6.0"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
+  def testRowTimeUnBoundedDistinctPartitionedRowsOver(): Unit = {
+    val data: Seq[Either[(Long, (Int, Long, String)), Long]] = Seq(
+      Left(14000005L, (1, 1L, "Hi")),
+      Left(14000000L, (2, 1L, "Hello")),
+      Left(14000001L, (1, 1L, "Hello")),
+      Left(14000002L, (1, 2L, "Hello")),
+      Left(14000002L, (1, 3L, "Hello world")),
+      Left(14000003L, (2, 2L, "Hello world")),
+      Left(14000003L, (2, 3L, "Hello world")),
+      Right(14000020L),
+      Left(14000021L, (1, 4L, "Hello world")),
+      Left(14000022L, (1, 5L, "Hello world")),
+      Left(14000022L, (1, 6L, "Hello world")),
+      Left(14000022L, (1, 7L, "Hello world")),
+      Left(14000023L, (2, 4L, "Hello world")),
+      Left(14000023L, (2, 5L, "Hello world")),
+      Right(14000030L)
+    )
+
+    val source = failingDataSource(data)
+    val table = source.transform("TimeAssigner", new EventTimeProcessOperator[(Int, Long, String)])
+      .setParallelism(source.parallelism)
+      .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+    val windowedTable = table
+      .window(Over partitionBy 'c orderBy 'rowtime preceding UNBOUNDED_ROW following
+         CURRENT_ROW as 'w)
+      .select(
+        'c,
+        'b.count.distinct over 'w,
+        'b.sum.distinct over 'w,
+        ('b.cast(DataTypes.FLOAT) as 'b).avg.distinct over 'w)
+
+    val sink = new TestingAppendSink
+    windowedTable.toAppendStream[Row].addSink(sink)
+    env.execute()
+    val expected = Seq(
+      "Hello,1,1,1.0",
+      "Hello,1,1,1.0",
+      "Hello,2,3,1.5",
+      "Hello world,1,3,3.0",
+      "Hello world,2,5,2.5",
+      "Hello world,2,5,2.5",
+      "Hi,1,1,1.0",
+      "Hello world,3,9,3.0",
+      "Hello world,4,14,3.5",
+      "Hello world,5,20,4.0",
+      "Hello world,6,27,4.5",
+      "Hello world,6,27,4.5",
+      "Hello world,6,27,4.5"
+    )
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+
+  @Test
   def testProcTimeBoundedPartitionedRowsOver(): Unit = {
 
     val data = List(