You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:12 UTC
[03/44] flink git commit: [FLINK-6617][table] Improve JAVA and SCALA
logical plans consistent test
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/OverWindowITCase.scala
new file mode 100644
index 0000000..15828eb
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/OverWindowITCase.scala
@@ -0,0 +1,838 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream.sql
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class OverWindowITCase extends StreamingWithStateTestBase {
+
+ val data = List(
+ (1L, 1, "Hello"),
+ (2L, 2, "Hello"),
+ (3L, 3, "Hello"),
+ (4L, 4, "Hello"),
+ (5L, 5, "Hello"),
+ (6L, 6, "Hello"),
+ (7L, 7, "Hello World"),
+ (8L, 8, "Hello World"),
+ (20L, 20, "Hello World"))
+
+ @Test
+ def testProcTimeBoundedPartitionedRowsOver(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setParallelism(1)
+ StreamITCase.clear
+
+ val t = StreamTestData.get5TupleDataStream(env)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(c) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
+ " MIN(c) OVER (" +
+ " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
+ "FROM MyTable"
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "1,0,0",
+ "2,1,1",
+ "2,3,1",
+ "3,3,3",
+ "3,7,3",
+ "3,12,3",
+ "4,6,6",
+ "4,13,6",
+ "4,21,6",
+ "4,30,6",
+ "5,10,10",
+ "5,21,10",
+ "5,33,10",
+ "5,46,10",
+ "5,60,10")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testProcTimeBoundedNonPartitionedRowsOver(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ StreamITCase.clear
+
+ val t = StreamTestData.get5TupleDataStream(env)
+ .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+ tEnv.registerTable("MyTable", t)
+
+ val sqlQuery = "SELECT a, " +
+ " SUM(c) OVER (" +
+ " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
+ " MIN(c) OVER (" +
+ " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
+ "FROM MyTable"
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "1,0,0",
+ "2,1,0",
+ "2,3,0",
+ "3,6,0",
+ "3,10,0",
+ "3,15,0",
+ "4,21,0",
+ "4,28,0",
+ "4,36,0",
+ "4,45,0",
+ "5,55,0",
+ "5,66,1",
+ "5,77,2",
+ "5,88,3",
+ "5,99,4")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testProcTimeUnboundedPartitionedRangeOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ // for sum aggregation ensure that every time the order of each element is consistent
+ env.setParallelism(1)
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
+ "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testProcTimeUnboundedPartitionedRowsOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT c, cnt1 from " +
+ "(SELECT " +
+ "c, " +
+ "count(a) " +
+ " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+ "as cnt1 from T1)"
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "Hello World,1", "Hello World,2", "Hello World,3",
+ "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+
+ }
+
+ @Test
+ def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
+ val queryConfig =
+ new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(2), Time.hours(3))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ // for sum aggregation ensure that every time the order of each element is consistent
+ env.setParallelism(1)
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
+ "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testProcTimeUnboundedNonPartitionedRowsOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
+ "from T1"
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedPartitionedRangeOver(): Unit = {
+ val data = Seq(
+ Left((1500L, (1L, 15, "Hello"))),
+ Left((1600L, (1L, 16, "Hello"))),
+ Left((1000L, (1L, 1, "Hello"))),
+ Left((2000L, (2L, 2, "Hello"))),
+ Right(1000L),
+ Left((2000L, (2L, 2, "Hello"))),
+ Left((2000L, (2L, 3, "Hello"))),
+ Left((3000L, (3L, 3, "Hello"))),
+ Right(2000L),
+ Left((4000L, (4L, 4, "Hello"))),
+ Right(3000L),
+ Left((5000L, (5L, 5, "Hello"))),
+ Right(5000L),
+ Left((6000L, (6L, 6, "Hello"))),
+ Left((6500L, (6L, 65, "Hello"))),
+ Right(7000L),
+ Left((9000L, (6L, 9, "Hello"))),
+ Left((9500L, (6L, 18, "Hello"))),
+ Left((9000L, (6L, 9, "Hello"))),
+ Right(10000L),
+ Left((10000L, (7L, 7, "Hello World"))),
+ Left((11000L, (7L, 17, "Hello World"))),
+ Left((11000L, (7L, 77, "Hello World"))),
+ Right(12000L),
+ Left((14000L, (7L, 18, "Hello World"))),
+ Right(14000L),
+ Left((15000L, (8L, 8, "Hello World"))),
+ Right(17000L),
+ Left((20000L, (20L, 20, "Hello World"))),
+ Right(19000L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env
+ .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ " c, b, " +
+ " COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+ " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+ " SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
+ " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
+ " FROM T1"
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+ "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+ "Hello,3,4,9",
+ "Hello,4,2,7",
+ "Hello,5,2,9",
+ "Hello,6,2,11", "Hello,65,2,12",
+ "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+ "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+ "Hello World,8,2,15",
+ "Hello World,20,1,20")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedPartitionedRowsOver(): Unit = {
+ val data = Seq(
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((3L, (7L, 7, "Hello World"))),
+ Left((1L, (7L, 7, "Hello World"))),
+ Left((1L, (7L, 7, "Hello World"))),
+ Right(2L),
+ Left((3L, (3L, 3, "Hello"))),
+ Left((4L, (4L, 4, "Hello"))),
+ Left((5L, (5L, 5, "Hello"))),
+ Left((6L, (6L, 6, "Hello"))),
+ Left((20L, (20L, 20, "Hello World"))),
+ Right(6L),
+ Left((8L, (8L, 8, "Hello World"))),
+ Left((7L, (7L, 7, "Hello World"))),
+ Right(20L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env
+ .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ " c, a, " +
+ " COUNT(a) " +
+ " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
+ " SUM(a) " +
+ " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
+ "FROM T1"
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+ "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+ "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
+ "Hello,6,3,15",
+ "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
+ "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedNonPartitionedRangeOver(): Unit = {
+ val data = Seq(
+ Left((1500L, (1L, 15, "Hello"))),
+ Left((1600L, (1L, 16, "Hello"))),
+ Left((1000L, (1L, 1, "Hello"))),
+ Left((2000L, (2L, 2, "Hello"))),
+ Right(1000L),
+ Left((2000L, (2L, 2, "Hello"))),
+ Left((2000L, (2L, 3, "Hello"))),
+ Left((3000L, (3L, 3, "Hello"))),
+ Right(2000L),
+ Left((4000L, (4L, 4, "Hello"))),
+ Right(3000L),
+ Left((5000L, (5L, 5, "Hello"))),
+ Right(5000L),
+ Left((6000L, (6L, 6, "Hello"))),
+ Left((6500L, (6L, 65, "Hello"))),
+ Right(7000L),
+ Left((9000L, (6L, 9, "Hello"))),
+ Left((9500L, (6L, 18, "Hello"))),
+ Left((9000L, (6L, 9, "Hello"))),
+ Right(10000L),
+ Left((10000L, (7L, 7, "Hello World"))),
+ Left((11000L, (7L, 17, "Hello World"))),
+ Left((11000L, (7L, 77, "Hello World"))),
+ Right(12000L),
+ Left((14000L, (7L, 18, "Hello World"))),
+ Right(14000L),
+ Left((15000L, (8L, 8, "Hello World"))),
+ Right(17000L),
+ Left((20000L, (20L, 20, "Hello World"))),
+ Right(19000L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env
+ .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ " c, b, " +
+ " COUNT(a) " +
+ " OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
+ " SUM(a) " +
+ " OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
+ " FROM T1"
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
+ "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
+ "Hello,3,4,9",
+ "Hello,4,2,7",
+ "Hello,5,2,9",
+ "Hello,6,2,11", "Hello,65,2,12",
+ "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
+ "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
+ "Hello World,8,2,15",
+ "Hello World,20,1,20")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeBoundedNonPartitionedRowsOver(): Unit = {
+
+ val data = Seq(
+ Left((2L, (2L, 2, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((2L, (2L, 2, "Hello"))),
+ Left((1L, (1L, 1, "Hello"))),
+ Left((20L, (20L, 20, "Hello World"))), // early row
+ Right(3L),
+ Left((2L, (2L, 2, "Hello"))), // late row
+ Left((3L, (3L, 3, "Hello"))),
+ Left((4L, (4L, 4, "Hello"))),
+ Left((5L, (5L, 5, "Hello"))),
+ Left((6L, (6L, 6, "Hello"))),
+ Left((7L, (7L, 7, "Hello World"))),
+ Right(7L),
+ Left((9L, (9L, 9, "Hello World"))),
+ Left((8L, (8L, 8, "Hello World"))),
+ Left((8L, (8L, 8, "Hello World"))),
+ Right(20L))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t1 = env
+ .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val sqlQuery = "SELECT " +
+ "c, a, " +
+ " COUNT(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW), " +
+ " SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
+ "FROM T1"
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
+ "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
+ "Hello,3,3,7",
+ "Hello,4,3,9", "Hello,5,3,12",
+ "Hello,6,3,15", "Hello World,7,3,18",
+ "Hello World,8,3,21", "Hello World,8,3,23",
+ "Hello World,9,3,25",
+ "Hello World,20,3,37")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT a, b, c, " +
+ " SUM(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " COUNT(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " AVG(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MAX(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MIN(b) OVER (" +
+ " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+ "FROM T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (1, 1L, "Hello")),
+ Left(14000002L, (1, 2L, "Hello")),
+ Left(14000002L, (1, 3L, "Hello world")),
+ Left(14000003L, (2, 2L, "Hello world")),
+ Left(14000003L, (2, 3L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 4L, "Hello world")),
+ Left(14000022L, (1, 5L, "Hello world")),
+ Left(14000022L, (1, 6L, "Hello world")),
+ Left(14000022L, (1, 7L, "Hello world")),
+ Left(14000023L, (2, 4L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000030L))
+
+ val t1 = env
+ .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "1,1,Hello,6,3,2,3,1",
+ "1,2,Hello,6,3,2,3,1",
+ "1,3,Hello world,6,3,2,3,1",
+ "1,1,Hi,7,4,1,3,1",
+ "2,1,Hello,1,1,1,1,1",
+ "2,2,Hello world,6,3,2,3,1",
+ "2,3,Hello world,6,3,2,3,1",
+ "1,4,Hello world,11,5,2,4,1",
+ "1,5,Hello world,29,8,3,7,1",
+ "1,6,Hello world,29,8,3,7,1",
+ "1,7,Hello world,29,8,3,7,1",
+ "2,4,Hello world,15,5,3,5,1",
+ "2,5,Hello world,15,5,3,5,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedPartitionedRowsOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "count(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "avg(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "max(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "min(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (3, 1L, "Hello")),
+ Left(14000003L, (1, 2L, "Hello")),
+ Left(14000004L, (1, 3L, "Hello world")),
+ Left(14000007L, (3, 2L, "Hello world")),
+ Left(14000008L, (2, 2L, "Hello world")),
+ Right(14000010L),
+ Left(14000012L, (1, 5L, "Hello world")),
+ Left(14000021L, (1, 6L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000020L),
+ Left(14000024L, (3, 5L, "Hello world")),
+ Left(14000026L, (1, 7L, "Hello world")),
+ Left(14000025L, (1, 8L, "Hello world")),
+ Left(14000022L, (1, 9L, "Hello world")),
+ Right(14000030L))
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,2,Hello,2,1,2,2,2",
+ "1,3,Hello world,5,2,2,3,2",
+ "1,1,Hi,6,3,2,3,1",
+ "2,1,Hello,1,1,1,1,1",
+ "2,2,Hello world,3,2,1,2,1",
+ "3,1,Hello,1,1,1,1,1",
+ "3,2,Hello world,3,2,1,2,1",
+ "1,5,Hello world,11,4,2,5,1",
+ "1,6,Hello world,17,5,3,6,1",
+ "1,9,Hello world,26,6,4,9,1",
+ "1,8,Hello world,34,7,4,9,1",
+ "1,7,Hello world,41,8,5,9,1",
+ "2,5,Hello world,8,3,2,5,1",
+ "3,5,Hello world,8,3,2,5,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedNonPartitionedRangeOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ env.setParallelism(1)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT a, b, c, " +
+ " SUM(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " COUNT(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " AVG(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MAX(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MIN(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+ "FROM T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (1, 1L, "Hello")),
+ Left(14000002L, (1, 2L, "Hello")),
+ Left(14000002L, (1, 3L, "Hello world")),
+ Left(14000003L, (2, 2L, "Hello world")),
+ Left(14000003L, (2, 3L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 4L, "Hello world")),
+ Left(14000022L, (1, 5L, "Hello world")),
+ Left(14000022L, (1, 6L, "Hello world")),
+ Left(14000022L, (1, 7L, "Hello world")),
+ Left(14000023L, (2, 4L, "Hello world")),
+ Left(14000023L, (2, 5L, "Hello world")),
+ Right(14000030L))
+
+ val t1 = env
+ .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "2,1,Hello,1,1,1,1,1",
+ "1,1,Hello,7,4,1,3,1",
+ "1,2,Hello,7,4,1,3,1",
+ "1,3,Hello world,7,4,1,3,1",
+ "2,2,Hello world,12,6,2,3,1",
+ "2,3,Hello world,12,6,2,3,1",
+ "1,1,Hi,13,7,1,3,1",
+ "1,4,Hello world,17,8,2,4,1",
+ "1,5,Hello world,35,11,3,7,1",
+ "1,6,Hello world,35,11,3,7,1",
+ "1,7,Hello world,35,11,3,7,1",
+ "2,4,Hello world,44,13,3,7,1",
+ "2,5,Hello world,44,13,3,7,1")
+
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testRowTimeUnBoundedNonPartitionedRowsOver(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT a, b, c, " +
+ " SUM(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " COUNT(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " AVG(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MAX(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
+ " MIN(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
+ "FROM T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 2L, "Hello")),
+ Left(14000002L, (3, 5L, "Hello")),
+ Left(14000003L, (1, 3L, "Hello")),
+ Left(14000004L, (3, 7L, "Hello world")),
+ Left(14000007L, (4, 9L, "Hello world")),
+ Left(14000008L, (5, 8L, "Hello world")),
+ Right(14000010L),
+ // this element will be discard because it is late
+ Left(14000008L, (6, 8L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (6, 8L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val t1 = env
+ .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "2,2,Hello,2,1,2,2,2",
+ "3,5,Hello,7,2,3,5,2",
+ "1,3,Hello,10,3,3,5,2",
+ "3,7,Hello world,17,4,4,7,2",
+ "1,1,Hi,18,5,3,7,1",
+ "4,9,Hello world,27,6,4,9,1",
+ "5,8,Hello world,35,7,5,9,1",
+ "6,8,Hello world,43,8,5,9,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test sliding event-time unbounded window with partition by **/
+ @Test
+ def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setStateBackend(getStateBackend)
+ StreamITCase.clear
+ env.setParallelism(1)
+
+ val sqlQuery = "SELECT a, b, c, " +
+ "SUM(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "count(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "avg(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "max(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row), " +
+ "min(b) over (" +
+ "partition by a order by rowtime rows between unbounded preceding and current row) " +
+ "from T1"
+
+ val data = Seq(
+ Left(14000005L, (1, 1L, "Hi")),
+ Left(14000000L, (2, 1L, "Hello")),
+ Left(14000002L, (3, 1L, "Hello")),
+ Left(14000003L, (1, 2L, "Hello")),
+ Left(14000004L, (1, 3L, "Hello world")),
+ Left(14000007L, (3, 2L, "Hello world")),
+ Left(14000008L, (2, 2L, "Hello world")),
+ Right(14000010L),
+ // the next 3 elements are late
+ Left(14000008L, (1, 4L, "Hello world")),
+ Left(14000008L, (2, 3L, "Hello world")),
+ Left(14000008L, (3, 3L, "Hello world")),
+ Left(14000012L, (1, 5L, "Hello world")),
+ Right(14000020L),
+ Left(14000021L, (1, 6L, "Hello world")),
+ // the next 3 elements are late
+ Left(14000019L, (1, 6L, "Hello world")),
+ Left(14000018L, (2, 4L, "Hello world")),
+ Left(14000018L, (3, 4L, "Hello world")),
+ Left(14000022L, (2, 5L, "Hello world")),
+ Left(14000022L, (3, 5L, "Hello world")),
+ Left(14000024L, (1, 7L, "Hello world")),
+ Left(14000023L, (1, 8L, "Hello world")),
+ Left(14000021L, (1, 9L, "Hello world")),
+ Right(14000030L)
+ )
+
+ val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
+ .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
+
+ tEnv.registerTable("T1", t1)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "1,2,Hello,2,1,2,2,2",
+ "1,3,Hello world,5,2,2,3,2",
+ "1,1,Hi,6,3,2,3,1",
+ "2,1,Hello,1,1,1,1,1",
+ "2,2,Hello world,3,2,1,2,1",
+ "3,1,Hello,1,1,1,1,1",
+ "3,2,Hello world,3,2,1,2,1",
+ "1,5,Hello world,11,4,2,5,1",
+ "1,6,Hello world,17,5,3,6,1",
+ "1,9,Hello world,26,6,4,9,1",
+ "1,8,Hello world,34,7,4,9,1",
+ "1,7,Hello world,41,8,5,9,1",
+ "2,5,Hello world,8,3,2,5,1",
+ "3,5,Hello world,8,3,2,5,1"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/SqlITCase.scala
new file mode 100644
index 0000000..ec719d2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/SqlITCase.scala
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream.sql
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+class SqlITCase extends StreamingWithStateTestBase {
+
+ /** test row stream registered table **/
+ @Test
+ def testRowRegister(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT * FROM MyTableRow WHERE c < 3"
+
+ val data = List(
+ Row.of("Hello", "Worlds", Int.box(1)),
+ Row.of("Hello", "Hiden", Int.box(5)),
+ Row.of("Hello again", "Worlds", Int.box(2)))
+
+ implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO) // tpe is automatically
+
+ val ds = env.fromCollection(data)
+
+ val t = ds.toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTableRow", t)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List("Hello,Worlds,1","Hello again,Worlds,2")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test unbounded groupBy (without window) **/
+ @Test
+ def testUnboundedGroupBy(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT b, COUNT(a) FROM MyTable GROUP BY b"
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val result = tEnv.sql(sqlQuery).toRetractStream[Row]
+ result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+ env.execute()
+
+ val expected = List("1,1", "2,2", "3,3", "4,4", "5,5", "6,6")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ /** test selection **/
+ @Test
+ def testSelectExpressionFromTable(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List("2,0", "4,1", "6,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test filtering with registered table **/
+ @Test
+ def testSimpleFilter(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE a = 3"
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List("3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test filtering with registered datastream **/
+ @Test
+ def testDatastreamFilter(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
+
+ val t = StreamTestData.getSmall3TupleDataStream(env)
+ tEnv.registerDataStream("MyTable", t)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List("3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test union with registered tables **/
+ @Test
+ def testUnion(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT * FROM T1 " +
+ "UNION ALL " +
+ "SELECT * FROM T2"
+
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T1", t1)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "1,1,Hi", "1,1,Hi",
+ "2,2,Hello", "2,2,Hello",
+ "3,2,Hello world", "3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test union with filter **/
+ @Test
+ def testUnionWithFilter(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " +
+ "UNION ALL " +
+ "SELECT * FROM T2 WHERE a = 2"
+
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T1", t1)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "2,2,Hello",
+ "3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test union of a table and a datastream **/
+ @Test
+ def testUnionTableWithDataSet(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " +
+ "UNION ALL " +
+ "SELECT c FROM T2 WHERE a = 2"
+
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T1", t1)
+ val t2 = StreamTestData.get3TupleDataStream(env)
+ tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List("Hello", "Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testUnnestPrimitiveArrayFromTable(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val data = List(
+ (1, Array(12, 45), Array(Array(12, 45))),
+ (2, Array(41, 5), Array(Array(18), Array(87))),
+ (3, Array(18, 42), Array(Array(1), Array(45)))
+ )
+ val stream = env.fromCollection(data)
+ tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "1,[12, 45],12",
+ "1,[12, 45],45",
+ "2,[41, 5],41",
+ "2,[41, 5],5",
+ "3,[18, 42],18",
+ "3,[18, 42],42"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testUnnestArrayOfArrayFromTable(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val data = List(
+ (1, Array(12, 45), Array(Array(12, 45))),
+ (2, Array(41, 5), Array(Array(18), Array(87))),
+ (3, Array(18, 42), Array(Array(1), Array(45)))
+ )
+ val stream = env.fromCollection(data)
+ tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
+
+ val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "1,[12, 45]",
+ "2,[18]",
+ "2,[87]",
+ "3,[1]",
+ "3,[45]")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testUnnestObjectArrayFromTableWithFilter(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val data = List(
+ (1, Array((12, "45.6"), (12, "45.612"))),
+ (2, Array((13, "41.6"), (14, "45.2136"))),
+ (3, Array((18, "42.6")))
+ )
+ val stream = env.fromCollection(data)
+ tEnv.registerDataStream("T", stream, 'a, 'b)
+
+ val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
+
+ val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+ result.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = List(
+ "2,[(13,41.6), (14,45.2136)],14,45.2136",
+ "3,[(18,42.6)],18,42.6")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/TableSourceITCase.scala
new file mode 100644
index 0000000..31e7c5a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/sql/TableSourceITCase.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.datastream.StreamITCase
+import org.apache.flink.table.utils.CommonTestData
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSourceITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ def testCsvTableSource(): Unit = {
+
+ val csvTable = CommonTestData.getCsvTableSource
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ tEnv.registerTableSource("persons", csvTable)
+
+ tEnv.sql(
+ "SELECT id, `first`, `last`, score FROM persons WHERE id < 4 ")
+ .toAppendStream[Row]
+ .addSink(new StreamITCase.StringSink[Row])
+
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,Mike,Smith,12.3",
+ "2,Bob,Taylor,45.6",
+ "3,Sam,Miller,7.89")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala
new file mode 100644
index 0000000..2e8f206
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/CalcITCase.scala
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.expressions.utils.{RichFunc1, RichFunc2}
+import org.apache.flink.table.runtime.datastream.StreamITCase
+import org.apache.flink.table.utils.UserDefinedFunctionTestUtils
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class CalcITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ def testSimpleSelectAll(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
+
+ val results = ds.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1,Hi",
+ "2,2,Hello",
+ "3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testSelectFirst(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
+
+ val results = ds.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList("1", "2", "3")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testSimpleSelectWithNaming(): Unit = {
+
+ // verify ProjectMergeRule.
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
+ .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
+ .select('a, 'b)
+
+ val results = ds.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
+ "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
+ "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testSimpleSelectAllWithAs(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .select('a, 'b, 'c)
+
+ val results = ds.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1,Hi",
+ "2,2,Hello",
+ "3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testSimpleFilter(): Unit = {
+ /*
+ * Test simple filter
+ */
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter('a === 3)
+ val results = filterDs.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList("3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testAllRejectingFilter(): Unit = {
+ /*
+ * Test all-rejecting filter
+ */
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( Literal(false) )
+ val results = filterDs.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ assertEquals(true, StreamITCase.testResults.isEmpty)
+ }
+
+ @Test
+ def testAllPassingFilter(): Unit = {
+ /*
+ * Test all-passing filter
+ */
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( Literal(true) )
+ val results = filterDs.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1,Hi",
+ "2,2,Hello",
+ "3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testFilterOnIntegerTupleField(): Unit = {
+ /*
+ * Test filter on Integer tuple field.
+ */
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( 'a % 2 === 0 )
+ val results = filterDs.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "2,2,Hello", "4,3,Hello world, how are you?",
+ "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
+ "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
+ "18,6,Comment#12", "20,6,Comment#14")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testNotEquals(): Unit = {
+ /*
+ * Test filter on Integer tuple field.
+ */
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( 'a % 2 !== 0)
+ val results = filterDs.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ val expected = mutable.MutableList(
+ "1,1,Hi", "3,2,Hello world",
+ "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
+ "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
+ "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testUserDefinedFunctionWithParameter(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ tEnv.registerFunction("RichFunc2", new RichFunc2)
+ UserDefinedFunctionTestUtils.setJobParameters(env, Map("string.value" -> "ABC"))
+
+ StreamITCase.testResults = mutable.MutableList()
+
+ val result = StreamTestData.get3TupleDataStream(env)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .where("RichFunc2(c)='ABC#Hello'")
+ .select('c)
+
+ val results = result.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList("Hello")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testMultipleUserDefinedFunctions(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ tEnv.registerFunction("RichFunc1", new RichFunc1)
+ tEnv.registerFunction("RichFunc2", new RichFunc2)
+ UserDefinedFunctionTestUtils.setJobParameters(env, Map("string.value" -> "Abc"))
+
+ StreamITCase.testResults = mutable.MutableList()
+
+ val result = StreamTestData.get3TupleDataStream(env)
+ .toTable(tEnv, 'a, 'b, 'c)
+ .where("RichFunc2(c)='Abc#Hello' || RichFunc1(a)=3 && b=2")
+ .select('c)
+
+ val results = result.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList("Hello", "Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala
new file mode 100644
index 0000000..8df14e8
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupAggregationsITCase.scala
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream.table
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.datastream.StreamITCase.RetractingSink
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import org.apache.flink.table.runtime.datastream.{StreamITCase, StreamingWithStateTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+ * Tests of groupby (without window) aggregations
+ */
+class GroupAggregationsITCase extends StreamingWithStateTestBase {
+ private val queryConfig = new StreamQueryConfig()
+ queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+
+
+ @Test
+ def testDistinct(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .select('b).distinct()
+
+ val results = t.toRetractStream[Row](queryConfig)
+ results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+ env.execute()
+
+ val expected = mutable.MutableList("1", "2", "3", "4", "5", "6")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testDistinctAfterAggregate(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ .groupBy('e).select('e, 'a.count).distinct()
+
+ val results = t.toRetractStream[Row](queryConfig)
+ results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+ env.execute()
+
+ val expected = mutable.MutableList("1,5", "2,7", "3,3")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testNonKeyedGroupAggregate(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .select('a.sum, 'b.sum)
+
+ val results = t.toRetractStream[Row](queryConfig)
+ results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
+ env.execute()
+
+ val expected = List("231,91")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testGroupAggregate(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('b, 'a.sum)
+
+ val results = t.toRetractStream[Row](queryConfig)
+ results.addSink(new StreamITCase.RetractingSink)
+ env.execute()
+
+ val expected = List("1,1", "2,5", "3,15", "4,34", "5,65", "6,111")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testDoubleGroupAggregation(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .groupBy('b)
+ .select('a.count as 'cnt, 'b)
+ .groupBy('cnt)
+ .select('cnt, 'b.count as 'freq)
+
+ val results = t.toRetractStream[Row](queryConfig)
+
+ results.addSink(new RetractingSink)
+ env.execute()
+ val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+
+ @Test
+ def testGroupAggregateWithExpression(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStateBackend(getStateBackend)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.clear
+
+ val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ .groupBy('e, 'b % 3)
+ .select('c.min, 'e, 'a.avg, 'd.count)
+
+ val results = t.toRetractStream[Row](queryConfig)
+ results.addSink(new RetractingSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "0,1,1,1", "7,1,4,2", "2,1,3,2",
+ "3,2,3,3", "1,2,3,3", "14,2,5,1",
+ "12,3,5,1", "5,3,4,2")
+ assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala
new file mode 100644
index 0000000..1af85d9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/table/GroupWindowAggregationsITCase.scala
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.datastream.table
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.common.time.Time
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge}
+import org.apache.flink.table.functions.aggfunctions.CountAggFunction
+import org.apache.flink.table.runtime.datastream.StreamITCase
+import org.apache.flink.table.runtime.datastream.table.GroupWindowAggregationsITCase._
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+ * We only test some aggregations until better testing of constructed DataStream
+ * programs is possible.
+ */
+class GroupWindowAggregationsITCase extends StreamingMultipleProgramsTestBase {
+ private val queryConfig = new StreamQueryConfig()
+ queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
+
+ val data = List(
+ (1L, 1, "Hi"),
+ (2L, 2, "Hello"),
+ (4L, 2, "Hello"),
+ (8L, 3, "Hello world"),
+ (16L, 3, "Hello world"))
+
+ val data2 = List(
+ (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
+ (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
+ (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
+ (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
+ (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
+ (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
+ (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
+
+ @Test
+ def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
+
+ val countFun = new CountAggFunction
+ val weightAvgFun = new WeightedAvg
+
+ val windowedTable = table
+ .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+ .groupBy('w, 'string)
+ .select('string, countFun('int), 'int.avg,
+ weightAvgFun('long, 'int), weightAvgFun('int, 'int))
+
+ val results = windowedTable.toAppendStream[Row](queryConfig)
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = Seq("Hello world,1,3,8,3", "Hello world,2,3,12,3", "Hello,1,2,2,2",
+ "Hello,2,2,3,2", "Hi,1,1,1,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSessionGroupWindowOverTime(): Unit = {
+ //To verify the "merge" functionality, we create this test with the following characteristics:
+ // 1. set the Parallelism to 1, and have the test data out of order
+ // 2. create a waterMark with 10ms offset to delay the window emission by 10ms
+ val sessionWindowTestdata = List(
+ (1L, 1, "Hello"),
+ (2L, 2, "Hello"),
+ (8L, 8, "Hello"),
+ (9L, 9, "Hello World"),
+ (4L, 4, "Hello"),
+ (16L, 16, "Hello"))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val countFun = new CountAggFunction
+ val weightAvgFun = new WeightedAvgWithMerge
+
+ val stream = env
+ .fromCollection(sessionWindowTestdata)
+ .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L))
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
+
+ val windowedTable = table
+ .window(Session withGap 5.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, countFun('int), 'int.avg,
+ weightAvgFun('long, 'int), weightAvgFun('int, 'int))
+
+ val results = windowedTable.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = Seq("Hello World,1,9,9,9", "Hello,1,16,16,16", "Hello,4,3,5,5")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'proctime.proctime)
+ val countFun = new CountAggFunction
+ val weightAvgFun = new WeightedAvg
+
+ val windowedTable = table
+ .window(Tumble over 2.rows on 'proctime as 'w)
+ .groupBy('w)
+ .select(countFun('string), 'int.avg,
+ weightAvgFun('long, 'int), weightAvgFun('int, 'int))
+
+ val results = windowedTable.toAppendStream[Row](queryConfig)
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = Seq("2,1,1,1", "2,2,6,2")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeTumblingWindow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](0L))
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime)
+ val countFun = new CountAggFunction
+ val weightAvgFun = new WeightedAvg
+
+ val windowedTable = table
+ .window(Tumble over 5.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, countFun('string), 'int.avg, weightAvgFun('long, 'int),
+ weightAvgFun('int, 'int), 'int.min, 'int.max, 'int.sum, 'w.start, 'w.end)
+
+ val results = windowedTable.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = Seq(
+ "Hello world,1,3,8,3,3,3,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
+ "Hello world,1,3,16,3,3,3,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
+ "Hello,2,2,3,2,2,2,4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hi,1,1,1,1,1,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testGroupWindowWithoutKeyInProjection(): Unit = {
+ val data = List(
+ (1L, 1, "Hi", 1, 1),
+ (2L, 2, "Hello", 2, 2),
+ (4L, 2, "Hello", 2, 2),
+ (8L, 3, "Hello world", 3, 3),
+ (16L, 3, "Hello world", 3, 3))
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, 'proctime.proctime)
+
+ val weightAvgFun = new WeightedAvg
+
+ val windowedTable = table
+ .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
+ .groupBy('w, 'int2, 'int3, 'string)
+ .select(weightAvgFun('long, 'int))
+
+ val results = windowedTable.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = Seq("12", "8", "2", "3", "1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+
+
+ // ----------------------------------------------------------------------------------------------
+ // Sliding windows
+ // ----------------------------------------------------------------------------------------------
+
+ @Test
+ def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+ // please keep this test in sync with the DataSet variant
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data2)
+ .assignTimestampsAndWatermarks(
+ new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
+ val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 5.milli every 2.milli on 'long as 'w)
+ .groupBy('w)
+ .select('int.count, 'w.start, 'w.end)
+
+ val results = windowedTable.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = Seq(
+ "1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
+ "1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
+ "1,1970-01-01 00:00:00.014,1970-01-01 00:00:00.019",
+ "1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
+ "2,1969-12-31 23:59:59.998,1970-01-01 00:00:00.003",
+ "2,1970-01-01 00:00:00.006,1970-01-01 00:00:00.011",
+ "3,1970-01-01 00:00:00.002,1970-01-01 00:00:00.007",
+ "3,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
+ "4,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeOverlappingFullPane(): Unit = {
+ // please keep this test in sync with the DataSet variant
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data2)
+ .assignTimestampsAndWatermarks(
+ new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
+ val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 10.milli every 5.milli on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val results = windowedTable.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = Seq(
+ "Hallo,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+ "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+ "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
+ "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02",
+ "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025",
+ "Hello,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015",
+ "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+ "Hello,3,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01",
+ "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005",
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeOverlappingSplitPane(): Unit = {
+ // please keep this test in sync with the DataSet variant
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data2)
+ .assignTimestampsAndWatermarks(
+ new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
+ val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 5.milli every 4.milli on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val results = windowedTable.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = Seq(
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hello world,1,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
+ "Hello world,1,1970-01-01 00:00:00.008,1970-01-01 00:00:00.013",
+ "Hello world,1,1970-01-01 00:00:00.012,1970-01-01 00:00:00.017",
+ "Hello world,1,1970-01-01 00:00:00.016,1970-01-01 00:00:00.021",
+ "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hello,2,1970-01-01 00:00:00.004,1970-01-01 00:00:00.009",
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeNonOverlappingFullPane(): Unit = {
+ // please keep this test in sync with the DataSet variant
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data2)
+ .assignTimestampsAndWatermarks(
+ new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
+ val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 5.milli every 10.milli on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val results = windowedTable.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = Seq(
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTimeNonOverlappingSplitPane(): Unit = {
+ // please keep this test in sync with the DataSet variant
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data2)
+ .assignTimestampsAndWatermarks(
+ new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
+ val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val windowedTable = table
+ .window(Slide over 3.milli every 10.milli on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val results = windowedTable.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = Seq(
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeGroupWindowWithoutExplicitTimeField(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data2)
+ .assignTimestampsAndWatermarks(
+ new TimestampAndWatermarkWithOffset[(Long, Int, Double, Float, BigDecimal, String)](0L))
+ .map(t => (t._2, t._6))
+ val table = stream.toTable(tEnv, 'int, 'string, 'rowtime.rowtime)
+
+ val windowedTable = table
+ .window(Slide over 3.milli every 10.milli on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val results = windowedTable.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+ val expected = Seq(
+ "Hallo,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003",
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.003")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
+
+object GroupWindowAggregationsITCase {
+
+ class TimestampAndWatermarkWithOffset[T <: Product](
+ offset: Long) extends AssignerWithPunctuatedWatermarks[T] {
+
+ override def checkAndGetNextWatermark(
+ lastElement: T,
+ extractedTimestamp: Long): Watermark = {
+ new Watermark(extractedTimestamp - offset)
+ }
+
+ override def extractTimestamp(
+ element: T,
+ previousElementTimestamp: Long): Long = {
+ element.productElement(0).asInstanceOf[Long]
+ }
+ }
+
+}