You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:20 UTC
[11/44] flink git commit: [FLINK-6617][table] Improve JAVA and SCALA
logical plans consistent test
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
deleted file mode 100644
index 253d54a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/OverWindowITCase.scala
+++ /dev/null
@@ -1,864 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.stream.sql.TimeTestUtil.EventTimeSourceFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.api.watermark.Watermark
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment, TableException}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit._
-import org.apache.flink.streaming.api.TimeCharacteristic
-import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
-
-import scala.collection.mutable
-
-class OverWindowITCase extends StreamingWithStateTestBase {
-
- val data = List(
- (1L, 1, "Hello"),
- (2L, 2, "Hello"),
- (3L, 3, "Hello"),
- (4L, 4, "Hello"),
- (5L, 5, "Hello"),
- (6L, 6, "Hello"),
- (7L, 7, "Hello World"),
- (8L, 8, "Hello World"),
- (20L, 20, "Hello World"))
-
- /**
- * All aggregates must be computed on the same window.
- */
- @Test(expected = classOf[TableException])
- def testMultiWindow(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
-
- val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
- "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
- "from T1"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
- }
-
- @Test
- def testProcTimeBoundedPartitionedRowsOver(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setParallelism(1)
- StreamITCase.clear
-
- val t = StreamTestData.get5TupleDataStream(env)
- .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
- tEnv.registerTable("MyTable", t)
-
- val sqlQuery = "SELECT a, " +
- " SUM(c) OVER (" +
- " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " +
- " MIN(c) OVER (" +
- " PARTITION BY a ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " +
- "FROM MyTable"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "1,0,0",
- "2,1,1",
- "2,3,1",
- "3,3,3",
- "3,7,3",
- "3,12,3",
- "4,6,6",
- "4,13,6",
- "4,21,6",
- "4,30,6",
- "5,10,10",
- "5,21,10",
- "5,33,10",
- "5,46,10",
- "5,60,10")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testProcTimeBoundedNonPartitionedRowsOver(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStateBackend(getStateBackend)
- env.setParallelism(1)
- StreamITCase.clear
-
- val t = StreamTestData.get5TupleDataStream(env)
- .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
- tEnv.registerTable("MyTable", t)
-
- val sqlQuery = "SELECT a, " +
- " SUM(c) OVER (" +
- " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW), " +
- " MIN(c) OVER (" +
- " ORDER BY proctime ROWS BETWEEN 10 PRECEDING AND CURRENT ROW) " +
- "FROM MyTable"
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "1,0,0",
- "2,1,0",
- "2,3,0",
- "3,6,0",
- "3,10,0",
- "3,15,0",
- "4,21,0",
- "4,28,0",
- "4,36,0",
- "4,45,0",
- "5,55,0",
- "5,66,1",
- "5,77,2",
- "5,88,3",
- "5,99,4")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testProcTimeUnboundedPartitionedRangeOver(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- // for sum aggregation ensure that every time the order of each element is consistent
- env.setParallelism(1)
-
- val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "c, " +
- "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
- "sum(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding) " +
- "from T1"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "Hello World,1,7", "Hello World,2,15", "Hello World,3,35",
- "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testProcTimeUnboundedPartitionedRowsOver(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT c, cnt1 from " +
- "(SELECT " +
- "c, " +
- "count(a) " +
- " OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
- "as cnt1 from T1)"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "Hello World,1", "Hello World,2", "Hello World,3",
- "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testProcTimeUnboundedNonPartitionedRangeOver(): Unit = {
- val queryConfig =
- new StreamQueryConfig().withIdleStateRetentionTime(Time.hours(2), Time.hours(3))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- // for sum aggregation ensure that every time the order of each element is consistent
- env.setParallelism(1)
-
- val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "c, " +
- "count(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding), " +
- "sum(a) OVER (ORDER BY proctime RANGE UNBOUNDED preceding) " +
- "from T1"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row](queryConfig)
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "Hello World,7,28", "Hello World,8,36", "Hello World,9,56",
- "Hello,1,1", "Hello,2,3", "Hello,3,6", "Hello,4,10", "Hello,5,15", "Hello,6,21")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testProcTimeUnboundedNonPartitionedRowsOver(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "count(a) OVER (ORDER BY proctime ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW) " +
- "from T1"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List("1", "2", "3", "4", "5", "6", "7", "8", "9")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testRowTimeBoundedPartitionedRangeOver(): Unit = {
- val data = Seq(
- Left((1500L, (1L, 15, "Hello"))),
- Left((1600L, (1L, 16, "Hello"))),
- Left((1000L, (1L, 1, "Hello"))),
- Left((2000L, (2L, 2, "Hello"))),
- Right(1000L),
- Left((2000L, (2L, 2, "Hello"))),
- Left((2000L, (2L, 3, "Hello"))),
- Left((3000L, (3L, 3, "Hello"))),
- Right(2000L),
- Left((4000L, (4L, 4, "Hello"))),
- Right(3000L),
- Left((5000L, (5L, 5, "Hello"))),
- Right(5000L),
- Left((6000L, (6L, 6, "Hello"))),
- Left((6500L, (6L, 65, "Hello"))),
- Right(7000L),
- Left((9000L, (6L, 9, "Hello"))),
- Left((9500L, (6L, 18, "Hello"))),
- Left((9000L, (6L, 9, "Hello"))),
- Right(10000L),
- Left((10000L, (7L, 7, "Hello World"))),
- Left((11000L, (7L, 17, "Hello World"))),
- Left((11000L, (7L, 77, "Hello World"))),
- Right(12000L),
- Left((14000L, (7L, 18, "Hello World"))),
- Right(14000L),
- Left((15000L, (8L, 8, "Hello World"))),
- Right(17000L),
- Left((20000L, (20L, 20, "Hello World"))),
- Right(19000L))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env
- .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- " c, b, " +
- " COUNT(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
- " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
- " SUM(a) OVER (PARTITION BY c ORDER BY rowtime RANGE " +
- " BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW)" +
- " FROM T1"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
- "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
- "Hello,3,4,9",
- "Hello,4,2,7",
- "Hello,5,2,9",
- "Hello,6,2,11", "Hello,65,2,12",
- "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
- "Hello World,7,1,7", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
- "Hello World,8,2,15",
- "Hello World,20,1,20")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testRowTimeBoundedPartitionedRowsOver(): Unit = {
- val data = Seq(
- Left((1L, (1L, 1, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((1L, (1L, 1, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((1L, (1L, 1, "Hello"))),
- Left((3L, (7L, 7, "Hello World"))),
- Left((1L, (7L, 7, "Hello World"))),
- Left((1L, (7L, 7, "Hello World"))),
- Right(2L),
- Left((3L, (3L, 3, "Hello"))),
- Left((4L, (4L, 4, "Hello"))),
- Left((5L, (5L, 5, "Hello"))),
- Left((6L, (6L, 6, "Hello"))),
- Left((20L, (20L, 20, "Hello World"))),
- Right(6L),
- Left((8L, (8L, 8, "Hello World"))),
- Left((7L, (7L, 7, "Hello World"))),
- Right(20L))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env
- .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- " c, a, " +
- " COUNT(a) " +
- " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW), " +
- " SUM(a) " +
- " OVER (PARTITION BY c ORDER BY rowtime ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) " +
- "FROM T1"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
- "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
- "Hello,3,3,7", "Hello,4,3,9", "Hello,5,3,12",
- "Hello,6,3,15",
- "Hello World,7,1,7", "Hello World,7,2,14", "Hello World,7,3,21",
- "Hello World,7,3,21", "Hello World,8,3,22", "Hello World,20,3,35")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testRowTimeBoundedNonPartitionedRangeOver(): Unit = {
- val data = Seq(
- Left((1500L, (1L, 15, "Hello"))),
- Left((1600L, (1L, 16, "Hello"))),
- Left((1000L, (1L, 1, "Hello"))),
- Left((2000L, (2L, 2, "Hello"))),
- Right(1000L),
- Left((2000L, (2L, 2, "Hello"))),
- Left((2000L, (2L, 3, "Hello"))),
- Left((3000L, (3L, 3, "Hello"))),
- Right(2000L),
- Left((4000L, (4L, 4, "Hello"))),
- Right(3000L),
- Left((5000L, (5L, 5, "Hello"))),
- Right(5000L),
- Left((6000L, (6L, 6, "Hello"))),
- Left((6500L, (6L, 65, "Hello"))),
- Right(7000L),
- Left((9000L, (6L, 9, "Hello"))),
- Left((9500L, (6L, 18, "Hello"))),
- Left((9000L, (6L, 9, "Hello"))),
- Right(10000L),
- Left((10000L, (7L, 7, "Hello World"))),
- Left((11000L, (7L, 17, "Hello World"))),
- Left((11000L, (7L, 77, "Hello World"))),
- Right(12000L),
- Left((14000L, (7L, 18, "Hello World"))),
- Right(14000L),
- Left((15000L, (8L, 8, "Hello World"))),
- Right(17000L),
- Left((20000L, (20L, 20, "Hello World"))),
- Right(19000L))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env
- .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- " c, b, " +
- " COUNT(a) " +
- " OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW), " +
- " SUM(a) " +
- " OVER (ORDER BY rowtime RANGE BETWEEN INTERVAL '1' SECOND PRECEDING AND CURRENT ROW) " +
- " FROM T1"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "Hello,1,1,1", "Hello,15,2,2", "Hello,16,3,3",
- "Hello,2,6,9", "Hello,3,6,9", "Hello,2,6,9",
- "Hello,3,4,9",
- "Hello,4,2,7",
- "Hello,5,2,9",
- "Hello,6,2,11", "Hello,65,2,12",
- "Hello,9,2,12", "Hello,9,2,12", "Hello,18,3,18",
- "Hello World,7,4,25", "Hello World,17,3,21", "Hello World,77,3,21", "Hello World,18,1,7",
- "Hello World,8,2,15",
- "Hello World,20,1,20")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testRowTimeBoundedNonPartitionedRowsOver(): Unit = {
-
- val data = Seq(
- Left((2L, (2L, 2, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((1L, (1L, 1, "Hello"))),
- Left((1L, (1L, 1, "Hello"))),
- Left((2L, (2L, 2, "Hello"))),
- Left((1L, (1L, 1, "Hello"))),
- Left((20L, (20L, 20, "Hello World"))), // early row
- Right(3L),
- Left((2L, (2L, 2, "Hello"))), // late row
- Left((3L, (3L, 3, "Hello"))),
- Left((4L, (4L, 4, "Hello"))),
- Left((5L, (5L, 5, "Hello"))),
- Left((6L, (6L, 6, "Hello"))),
- Left((7L, (7L, 7, "Hello World"))),
- Right(7L),
- Left((9L, (9L, 9, "Hello World"))),
- Left((8L, (8L, 8, "Hello World"))),
- Left((8L, (8L, 8, "Hello World"))),
- Right(20L))
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- env.setParallelism(1)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t1 = env
- .addSource[(Long, Int, String)](new EventTimeSourceFunction[(Long, Int, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val sqlQuery = "SELECT " +
- "c, a, " +
- " COUNT(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW), " +
- " SUM(a) OVER (ORDER BY rowtime ROWS BETWEEN 2 preceding AND CURRENT ROW) " +
- "FROM T1"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "Hello,1,1,1", "Hello,1,2,2", "Hello,1,3,3",
- "Hello,2,3,4", "Hello,2,3,5", "Hello,2,3,6",
- "Hello,3,3,7",
- "Hello,4,3,9", "Hello,5,3,12",
- "Hello,6,3,15", "Hello World,7,3,18",
- "Hello World,8,3,21", "Hello World,8,3,23",
- "Hello World,9,3,25",
- "Hello World,20,3,37")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- env.setParallelism(1)
- StreamITCase.clear
-
- val sqlQuery = "SELECT a, b, c, " +
- " SUM(b) OVER (" +
- " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
- " COUNT(b) OVER (" +
- " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
- " AVG(b) OVER (" +
- " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
- " MAX(b) OVER (" +
- " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
- " MIN(b) OVER (" +
- " PARTITION BY a ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
- "FROM T1"
-
- val data = Seq(
- Left(14000005L, (1, 1L, "Hi")),
- Left(14000000L, (2, 1L, "Hello")),
- Left(14000002L, (1, 1L, "Hello")),
- Left(14000002L, (1, 2L, "Hello")),
- Left(14000002L, (1, 3L, "Hello world")),
- Left(14000003L, (2, 2L, "Hello world")),
- Left(14000003L, (2, 3L, "Hello world")),
- Right(14000020L),
- Left(14000021L, (1, 4L, "Hello world")),
- Left(14000022L, (1, 5L, "Hello world")),
- Left(14000022L, (1, 6L, "Hello world")),
- Left(14000022L, (1, 7L, "Hello world")),
- Left(14000023L, (2, 4L, "Hello world")),
- Left(14000023L, (2, 5L, "Hello world")),
- Right(14000030L))
-
- val t1 = env
- .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "1,1,Hello,6,3,2,3,1",
- "1,2,Hello,6,3,2,3,1",
- "1,3,Hello world,6,3,2,3,1",
- "1,1,Hi,7,4,1,3,1",
- "2,1,Hello,1,1,1,1,1",
- "2,2,Hello world,6,3,2,3,1",
- "2,3,Hello world,6,3,2,3,1",
- "1,4,Hello world,11,5,2,4,1",
- "1,5,Hello world,29,8,3,7,1",
- "1,6,Hello world,29,8,3,7,1",
- "1,7,Hello world,29,8,3,7,1",
- "2,4,Hello world,15,5,3,5,1",
- "2,5,Hello world,15,5,3,5,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testRowTimeUnBoundedPartitionedRowsOver(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- StreamITCase.testResults = mutable.MutableList()
-
- val sqlQuery = "SELECT a, b, c, " +
- "SUM(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "count(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "avg(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "max(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "min(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row) " +
- "from T1"
-
- val data = Seq(
- Left(14000005L, (1, 1L, "Hi")),
- Left(14000000L, (2, 1L, "Hello")),
- Left(14000002L, (3, 1L, "Hello")),
- Left(14000003L, (1, 2L, "Hello")),
- Left(14000004L, (1, 3L, "Hello world")),
- Left(14000007L, (3, 2L, "Hello world")),
- Left(14000008L, (2, 2L, "Hello world")),
- Right(14000010L),
- Left(14000012L, (1, 5L, "Hello world")),
- Left(14000021L, (1, 6L, "Hello world")),
- Left(14000023L, (2, 5L, "Hello world")),
- Right(14000020L),
- Left(14000024L, (3, 5L, "Hello world")),
- Left(14000026L, (1, 7L, "Hello world")),
- Left(14000025L, (1, 8L, "Hello world")),
- Left(14000022L, (1, 9L, "Hello world")),
- Right(14000030L))
-
- val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "1,2,Hello,2,1,2,2,2",
- "1,3,Hello world,5,2,2,3,2",
- "1,1,Hi,6,3,2,3,1",
- "2,1,Hello,1,1,1,1,1",
- "2,2,Hello world,3,2,1,2,1",
- "3,1,Hello,1,1,1,1,1",
- "3,2,Hello world,3,2,1,2,1",
- "1,5,Hello world,11,4,2,5,1",
- "1,6,Hello world,17,5,3,6,1",
- "1,9,Hello world,26,6,4,9,1",
- "1,8,Hello world,34,7,4,9,1",
- "1,7,Hello world,41,8,5,9,1",
- "2,5,Hello world,8,3,2,5,1",
- "3,5,Hello world,8,3,2,5,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testRowTimeUnBoundedNonPartitionedRangeOver(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- env.setParallelism(1)
- StreamITCase.clear
-
- val sqlQuery = "SELECT a, b, c, " +
- " SUM(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
- " COUNT(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
- " AVG(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
- " MAX(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
- " MIN(b) OVER (ORDER BY rowtime RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
- "FROM T1"
-
- val data = Seq(
- Left(14000005L, (1, 1L, "Hi")),
- Left(14000000L, (2, 1L, "Hello")),
- Left(14000002L, (1, 1L, "Hello")),
- Left(14000002L, (1, 2L, "Hello")),
- Left(14000002L, (1, 3L, "Hello world")),
- Left(14000003L, (2, 2L, "Hello world")),
- Left(14000003L, (2, 3L, "Hello world")),
- Right(14000020L),
- Left(14000021L, (1, 4L, "Hello world")),
- Left(14000022L, (1, 5L, "Hello world")),
- Left(14000022L, (1, 6L, "Hello world")),
- Left(14000022L, (1, 7L, "Hello world")),
- Left(14000023L, (2, 4L, "Hello world")),
- Left(14000023L, (2, 5L, "Hello world")),
- Right(14000030L))
-
- val t1 = env
- .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "2,1,Hello,1,1,1,1,1",
- "1,1,Hello,7,4,1,3,1",
- "1,2,Hello,7,4,1,3,1",
- "1,3,Hello world,7,4,1,3,1",
- "2,2,Hello world,12,6,2,3,1",
- "2,3,Hello world,12,6,2,3,1",
- "1,1,Hi,13,7,1,3,1",
- "1,4,Hello world,17,8,2,4,1",
- "1,5,Hello world,35,11,3,7,1",
- "1,6,Hello world,35,11,3,7,1",
- "1,7,Hello world,35,11,3,7,1",
- "2,4,Hello world,44,13,3,7,1",
- "2,5,Hello world,44,13,3,7,1")
-
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testRowTimeUnBoundedNonPartitionedRowsOver(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- StreamITCase.clear
- env.setParallelism(1)
-
- val sqlQuery = "SELECT a, b, c, " +
- " SUM(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
- " COUNT(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
- " AVG(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
- " MAX(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), " +
- " MIN(b) OVER (ORDER BY rowtime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) " +
- "FROM T1"
-
- val data = Seq(
- Left(14000005L, (1, 1L, "Hi")),
- Left(14000000L, (2, 2L, "Hello")),
- Left(14000002L, (3, 5L, "Hello")),
- Left(14000003L, (1, 3L, "Hello")),
- Left(14000004L, (3, 7L, "Hello world")),
- Left(14000007L, (4, 9L, "Hello world")),
- Left(14000008L, (5, 8L, "Hello world")),
- Right(14000010L),
- // this element will be discard because it is late
- Left(14000008L, (6, 8L, "Hello world")),
- Right(14000020L),
- Left(14000021L, (6, 8L, "Hello world")),
- Right(14000030L)
- )
-
- val t1 = env
- .addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "2,2,Hello,2,1,2,2,2",
- "3,5,Hello,7,2,3,5,2",
- "1,3,Hello,10,3,3,5,2",
- "3,7,Hello world,17,4,4,7,2",
- "1,1,Hi,18,5,3,7,1",
- "4,9,Hello world,27,6,4,9,1",
- "5,8,Hello world,35,7,5,9,1",
- "6,8,Hello world,43,8,5,9,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test sliding event-time unbounded window with partition by **/
- @Test
- def testRowTimeUnBoundedPartitionedRowsOver2(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- env.setStateBackend(getStateBackend)
- StreamITCase.clear
- env.setParallelism(1)
-
- val sqlQuery = "SELECT a, b, c, " +
- "SUM(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "count(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "avg(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "max(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row), " +
- "min(b) over (" +
- "partition by a order by rowtime rows between unbounded preceding and current row) " +
- "from T1"
-
- val data = Seq(
- Left(14000005L, (1, 1L, "Hi")),
- Left(14000000L, (2, 1L, "Hello")),
- Left(14000002L, (3, 1L, "Hello")),
- Left(14000003L, (1, 2L, "Hello")),
- Left(14000004L, (1, 3L, "Hello world")),
- Left(14000007L, (3, 2L, "Hello world")),
- Left(14000008L, (2, 2L, "Hello world")),
- Right(14000010L),
- // the next 3 elements are late
- Left(14000008L, (1, 4L, "Hello world")),
- Left(14000008L, (2, 3L, "Hello world")),
- Left(14000008L, (3, 3L, "Hello world")),
- Left(14000012L, (1, 5L, "Hello world")),
- Right(14000020L),
- Left(14000021L, (1, 6L, "Hello world")),
- // the next 3 elements are late
- Left(14000019L, (1, 6L, "Hello world")),
- Left(14000018L, (2, 4L, "Hello world")),
- Left(14000018L, (3, 4L, "Hello world")),
- Left(14000022L, (2, 5L, "Hello world")),
- Left(14000022L, (3, 5L, "Hello world")),
- Left(14000024L, (1, 7L, "Hello world")),
- Left(14000023L, (1, 8L, "Hello world")),
- Left(14000021L, (1, 9L, "Hello world")),
- Right(14000030L)
- )
-
- val t1 = env.addSource(new EventTimeSourceFunction[(Int, Long, String)](data))
- .toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
-
- tEnv.registerTable("T1", t1)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "1,2,Hello,2,1,2,2,2",
- "1,3,Hello world,5,2,2,3,2",
- "1,1,Hi,6,3,2,3,1",
- "2,1,Hello,1,1,1,1,1",
- "2,2,Hello world,3,2,1,2,1",
- "3,1,Hello,1,1,1,1,1",
- "3,2,Hello world,3,2,1,2,1",
- "1,5,Hello world,11,4,2,5,1",
- "1,6,Hello world,17,5,3,6,1",
- "1,9,Hello world,26,6,4,9,1",
- "1,8,Hello world,34,7,4,9,1",
- "1,7,Hello world,41,8,5,9,1",
- "2,5,Hello world,8,3,2,5,1",
- "3,5,Hello world,8,3,2,5,1"
- )
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
deleted file mode 100644
index 55633ff..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit._
-
-class SqlITCase extends StreamingWithStateTestBase {
-
- /** test row stream registered table **/
- @Test
- def testRowRegister(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val sqlQuery = "SELECT * FROM MyTableRow WHERE c < 3"
-
- val data = List(
- Row.of("Hello", "Worlds", Int.box(1)),
- Row.of("Hello", "Hiden", Int.box(5)),
- Row.of("Hello again", "Worlds", Int.box(2)))
-
- implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO) // tpe is automatically
-
- val ds = env.fromCollection(data)
-
- val t = ds.toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTableRow", t)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List("Hello,Worlds,1","Hello again,Worlds,2")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test unbounded groupby (without window) **/
- @Test
- def testUnboundedGroupby(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val sqlQuery = "SELECT b, COUNT(a) FROM MyTable GROUP BY b"
-
- val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", t)
-
- val result = tEnv.sql(sqlQuery).toRetractStream[Row]
- result.addSink(new StreamITCase.RetractingSink).setParallelism(1)
- env.execute()
-
- val expected = List("1,1", "2,2", "3,3", "4,4", "5,5", "6,6")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- /** test selection **/
- @Test
- def testSelectExpressionFromTable(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
-
- val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", t)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List("2,0", "4,1", "6,1")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test filtering with registered table **/
- @Test
- def testSimpleFilter(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val sqlQuery = "SELECT * FROM MyTable WHERE a = 3"
-
- val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("MyTable", t)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List("3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test filtering with registered datastream **/
- @Test
- def testDatastreamFilter(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
-
- val t = StreamTestData.getSmall3TupleDataStream(env)
- tEnv.registerDataStream("MyTable", t)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List("3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test union with registered tables **/
- @Test
- def testUnion(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val sqlQuery = "SELECT * FROM T1 " +
- "UNION ALL " +
- "SELECT * FROM T2"
-
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("T1", t1)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("T2", t2)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "1,1,Hi", "1,1,Hi",
- "2,2,Hello", "2,2,Hello",
- "3,2,Hello world", "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test union with filter **/
- @Test
- def testUnionWithFilter(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " +
- "UNION ALL " +
- "SELECT * FROM T2 WHERE a = 2"
-
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("T1", t1)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("T2", t2)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "2,2,Hello",
- "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- /** test union of a table and a datastream **/
- @Test
- def testUnionTableWithDataSet(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " +
- "UNION ALL " +
- "SELECT c FROM T2 WHERE a = 2"
-
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
- tEnv.registerTable("T1", t1)
- val t2 = StreamTestData.get3TupleDataStream(env)
- tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List("Hello", "Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testUnnestPrimitiveArrayFromTable(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val data = List(
- (1, Array(12, 45), Array(Array(12, 45))),
- (2, Array(41, 5), Array(Array(18), Array(87))),
- (3, Array(18, 42), Array(Array(1), Array(45)))
- )
- val stream = env.fromCollection(data)
- tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
-
- val sqlQuery = "SELECT a, b, s FROM T, UNNEST(T.b) AS A (s)"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "1,[12, 45],12",
- "1,[12, 45],45",
- "2,[41, 5],41",
- "2,[41, 5],5",
- "3,[18, 42],18",
- "3,[18, 42],42"
- )
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testUnnestArrayOfArrayFromTable(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val data = List(
- (1, Array(12, 45), Array(Array(12, 45))),
- (2, Array(41, 5), Array(Array(18), Array(87))),
- (3, Array(18, 42), Array(Array(1), Array(45)))
- )
- val stream = env.fromCollection(data)
- tEnv.registerDataStream("T", stream, 'a, 'b, 'c)
-
- val sqlQuery = "SELECT a, s FROM T, UNNEST(T.c) AS A (s)"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "1,[12, 45]",
- "2,[18]",
- "2,[87]",
- "3,[1]",
- "3,[45]")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testUnnestObjectArrayFromTableWithFilter(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val data = List(
- (1, Array((12, "45.6"), (12, "45.612"))),
- (2, Array((13, "41.6"), (14, "45.2136"))),
- (3, Array((18, "42.6")))
- )
- val stream = env.fromCollection(data)
- tEnv.registerDataStream("T", stream, 'a, 'b)
-
- val sqlQuery = "SELECT a, b, s, t FROM T, UNNEST(T.b) AS A (s, t) WHERE s > 13"
-
- val result = tEnv.sql(sqlQuery).toAppendStream[Row]
- result.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = List(
- "2,[(13,41.6), (14,45.2136)],14,45.2136",
- "3,[(18,42.6)],18,42.6")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
index f95d0ab..cc9d786 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala
@@ -18,8 +18,7 @@
package org.apache.flink.table.api.scala.stream.sql
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.{TableException, ValidationException}
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
import org.apache.flink.table.api.scala._
import org.apache.flink.table.plan.logical._
import org.apache.flink.table.utils.TableTestUtil._
@@ -31,17 +30,6 @@ class WindowAggregateTest extends TableTestBase {
streamUtil.addTable[(Int, String, Long)](
"MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
- /**
- * OVER clause is necessary for [[OverAgg0]] window function.
- */
- @Test(expected = classOf[ValidationException])
- def testOverAggregation(): Unit = {
- streamUtil.addFunction("overAgg", new OverAgg0)
-
- val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
- streamUtil.tEnv.sql(sqlQuery)
- }
-
@Test
def testGroupbyWithoutWindow() = {
val sql = "SELECT COUNT(a) FROM MyTable GROUP BY b"
@@ -66,7 +54,7 @@ class WindowAggregateTest extends TableTestBase {
@Test
def testTumbleFunction() = {
- streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
+ streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
val sql =
"SELECT " +
@@ -95,7 +83,7 @@ class WindowAggregateTest extends TableTestBase {
@Test
def testHoppingFunction() = {
- streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
+ streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
val sql =
"SELECT COUNT(*), weightedAvg(c, a) AS wAvg, " +
@@ -123,7 +111,7 @@ class WindowAggregateTest extends TableTestBase {
@Test
def testSessionFunction() = {
- streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
+ streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
val sql =
"SELECT " +
@@ -177,51 +165,4 @@ class WindowAggregateTest extends TableTestBase {
streamUtil.verifySql(sql, expected)
}
- @Test(expected = classOf[TableException])
- def testTumbleWindowNoOffset(): Unit = {
- val sqlQuery =
- "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
- "FROM MyTable " +
- "GROUP BY TUMBLE(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
-
- streamUtil.verifySql(sqlQuery, "n/a")
- }
-
- @Test(expected = classOf[TableException])
- def testHopWindowNoOffset(): Unit = {
- val sqlQuery =
- "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
- "FROM MyTable " +
- "GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
-
- streamUtil.verifySql(sqlQuery, "n/a")
- }
-
- @Test(expected = classOf[TableException])
- def testSessionWindowNoOffset(): Unit = {
- val sqlQuery =
- "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
- "FROM MyTable " +
- "GROUP BY SESSION(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
-
- streamUtil.verifySql(sqlQuery, "n/a")
- }
-
- @Test(expected = classOf[TableException])
- def testVariableWindowSize() = {
- val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, c * INTERVAL '1' MINUTE)"
- streamUtil.verifySql(sql, "n/a")
- }
-
- @Test(expected = classOf[ValidationException])
- def testWindowUdAggInvalidArgs(): Unit = {
- streamUtil.tEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
-
- val sqlQuery =
- "SELECT SUM(a) AS sumA, weightedAvg(a, b) AS wAvg " +
- "FROM MyTable " +
- "GROUP BY TUMBLE(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')"
-
- streamUtil.verifySql(sqlQuery, "n/a")
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/AggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/AggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/AggregationsValidationTest.scala
new file mode 100644
index 0000000..b1efc09
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/AggregationsValidationTest.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql.validation
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
+import org.apache.flink.table.expressions.AggFunctionCall
+import org.apache.flink.table.functions.AggregateFunction
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Assert.{assertEquals, assertTrue}
+import org.junit.{Ignore, Test}
+
+class AggregationsValidationTest extends TableTestBase {
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ streamUtil.addTable[(Int, String, Long)]("MyTable", 'a, 'b, 'c)
+
+ /**
+ * OVER clause is necessary for [[OverAgg0]] window function.
+ */
+ @Test(expected = classOf[ValidationException])
+ def testOverAggregation(): Unit = {
+ streamUtil.addFunction("overAgg", new OverAgg0)
+
+ val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
+
+ streamUtil.tableEnv.sql(sqlQuery)
+ }
+
+ @Test
+ def testDistinct(): Unit = {
+ val sql = "SELECT DISTINCT a, b, c FROM MyTable"
+
+ val expected =
+ unaryNode(
+ "DataStreamGroupAggregate",
+ streamTableNode(0),
+ term("groupBy", "a, b, c"),
+ term("select", "a, b, c")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+ // TODO: this query should be optimized to only have a single DataStreamGroupAggregate
+ // TODO: reopen this until FLINK-7144 fixed
+ @Ignore
+ @Test
+ def testDistinctAfterAggregate(): Unit = {
+ val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
+
+ val expected =
+ unaryNode(
+ "DataStreamGroupAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a")
+ ),
+ term("groupBy", "a"),
+ term("select", "a")
+ )
+ streamUtil.verifySql(sql, expected)
+ }
+
+
+ @Test
+ def testUserDefinedAggregateFunctionWithScalaAccumulator(): Unit = {
+ streamUtil.addFunction("udag", new MyAgg)
+ val call = streamUtil
+ .tEnv
+ .functionCatalog
+ .lookupFunction("udag", Seq())
+ .asInstanceOf[AggFunctionCall]
+
+ val typeInfo = call.accTypeInfo
+ assertTrue(typeInfo.isInstanceOf[CaseClassTypeInfo[_]])
+ assertEquals(2, typeInfo.getTotalFields)
+ val caseTypeInfo = typeInfo.asInstanceOf[CaseClassTypeInfo[_]]
+ assertEquals(Types.LONG, caseTypeInfo.getTypeAt(0))
+ assertEquals(Types.LONG, caseTypeInfo.getTypeAt(1))
+
+ streamUtil.addFunction("udag2", new MyAgg2)
+ val call2 = streamUtil
+ .tEnv
+ .functionCatalog
+ .lookupFunction("udag2", Seq())
+ .asInstanceOf[AggFunctionCall]
+
+ val typeInfo2 = call2.accTypeInfo
+ assertTrue(s"actual type: $typeInfo2", typeInfo2.isInstanceOf[RowTypeInfo])
+ assertEquals(2, typeInfo2.getTotalFields)
+ val rowTypeInfo = typeInfo2.asInstanceOf[RowTypeInfo]
+ assertEquals(Types.LONG, rowTypeInfo.getTypeAt(0))
+ assertEquals(Types.INT, rowTypeInfo.getTypeAt(1))
+ }
+}
+
+case class MyAccumulator(var sum: Long, var count: Long)
+
+class MyAgg extends AggregateFunction[Long, MyAccumulator] {
+
+ //Overloaded accumulate method
+ def accumulate(acc: MyAccumulator, value: Long): Unit = {
+ }
+
+ override def createAccumulator(): MyAccumulator = MyAccumulator(0, 0)
+
+ override def getValue(accumulator: MyAccumulator): Long = 1L
+}
+
+class MyAgg2 extends AggregateFunction[Long, Row] {
+
+ def accumulate(acc: Row, value: Long): Unit = {}
+
+ override def createAccumulator(): Row = new Row(2)
+
+ override def getValue(accumulator: Row): Long = 1L
+
+ def getAccumulatorType: TypeInformation[_] = new RowTypeInfo(Types.LONG, Types.INT)
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/OverWindowValidationTest.scala
new file mode 100644
index 0000000..3cc13f4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/OverWindowValidationTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class OverWindowValidationTest extends TableTestBase {
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ streamUtil.addTable[(Int, String, Long)]("T1", 'a, 'b, 'c, 'proctime.proctime)
+
+ /**
+ * All aggregates must be computed on the same window.
+ */
+ @Test(expected = classOf[TableException])
+ def testMultiWindow(): Unit = {
+
+ val sqlQuery = "SELECT " +
+ "c, " +
+ "count(a) OVER (PARTITION BY c ORDER BY proctime RANGE UNBOUNDED preceding), " +
+ "sum(a) OVER (PARTITION BY b ORDER BY proctime RANGE UNBOUNDED preceding) " +
+ "from T1"
+
+ streamUtil.tableEnv.sql(sqlQuery).toAppendStream[Row]
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/WindowAggregateValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/WindowAggregateValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/WindowAggregateValidationTest.scala
new file mode 100644
index 0000000..0fd8740
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/validation/WindowAggregateValidationTest.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.sql.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class WindowAggregateValidationTest extends TableTestBase {
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ streamUtil.addTable[(Int, String, Long)](
+ "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+ /**
+ * OVER clause is necessary for [[OverAgg0]] window function.
+ */
+ @Test(expected = classOf[ValidationException])
+ def testOverAggregation(): Unit = {
+ streamUtil.addFunction("overAgg", new OverAgg0)
+
+ val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
+ streamUtil.tableEnv.sql(sqlQuery)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testTumbleWindowNoOffset(): Unit = {
+ val sqlQuery =
+ "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+ "FROM MyTable " +
+ "GROUP BY TUMBLE(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+ streamUtil.verifySql(sqlQuery, "n/a")
+ }
+
+ @Test(expected = classOf[TableException])
+ def testHopWindowNoOffset(): Unit = {
+ val sqlQuery =
+ "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+ "FROM MyTable " +
+ "GROUP BY HOP(proctime, INTERVAL '1' HOUR, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+ streamUtil.verifySql(sqlQuery, "n/a")
+ }
+
+ @Test(expected = classOf[TableException])
+ def testSessionWindowNoOffset(): Unit = {
+ val sqlQuery =
+ "SELECT SUM(a) AS sumA, COUNT(b) AS cntB " +
+ "FROM MyTable " +
+ "GROUP BY SESSION(proctime, INTERVAL '2' HOUR, TIME '10:00:00')"
+
+ streamUtil.verifySql(sqlQuery, "n/a")
+ }
+
+ @Test(expected = classOf[TableException])
+ def testVariableWindowSize() = {
+ val sql = "SELECT COUNT(*) FROM MyTable GROUP BY TUMBLE(proctime, c * INTERVAL '1' MINUTE)"
+ streamUtil.verifySql(sql, "n/a")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWindowUdAggInvalidArgs(): Unit = {
+ streamUtil.tableEnv.registerFunction("weightedAvg", new WeightedAvgWithMerge)
+
+ val sqlQuery =
+ "SELECT SUM(a) AS sumA, weightedAvg(a, b) AS wAvg " +
+ "FROM MyTable " +
+ "GROUP BY TUMBLE(proctime(), INTERVAL '2' HOUR, TIME '10:00:00')"
+
+ streamUtil.verifySql(sqlQuery, "n/a")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
deleted file mode 100644
index adf4d44..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.table.api.{TableEnvironment, TableException}
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.types.Row
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class CalcITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testSimpleSelectAll(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
-
- val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1,Hi",
- "2,2,Hello",
- "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testSelectFirst(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
-
- val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList("1", "2", "3")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testSimpleSelectWithNaming(): Unit = {
-
- // verify ProjectMergeRule.
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
- .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
- .select('a, 'b)
-
- val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
- "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
- "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testSimpleSelectAllWithAs(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .select('a, 'b, 'c)
-
- val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1,Hi",
- "2,2,Hello",
- "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test(expected = classOf[TableException])
- def testAsWithToManyFields(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
-
- val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList("no")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test(expected = classOf[TableException])
- def testAsWithAmbiguousFields(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
-
- val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList("no")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
-
- @Test(expected = classOf[TableException])
- def testOnlyFieldRefInAs(): Unit = {
-
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
-
- val results = ds.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList("no")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testSimpleFilter(): Unit = {
- /*
- * Test simple filter
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter('a === 3)
- val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList("3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testAllRejectingFilter(): Unit = {
- /*
- * Test all-rejecting filter
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( Literal(false) )
- val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- assertEquals(true, StreamITCase.testResults.isEmpty)
- }
-
- @Test
- def testAllPassingFilter(): Unit = {
- /*
- * Test all-passing filter
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( Literal(true) )
- val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "1,1,Hi",
- "2,2,Hello",
- "3,2,Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testFilterOnIntegerTupleField(): Unit = {
- /*
- * Test filter on Integer tuple field.
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( 'a % 2 === 0 )
- val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "2,2,Hello", "4,3,Hello world, how are you?",
- "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
- "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
- "18,6,Comment#12", "20,6,Comment#14")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testNotEquals(): Unit = {
- /*
- * Test filter on Integer tuple field.
- */
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
- val filterDs = ds.filter( 'a % 2 !== 0)
- val results = filterDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
- val expected = mutable.MutableList(
- "1,1,Hi", "3,2,Hello world",
- "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
- "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
- "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExplainTest.scala
new file mode 100644
index 0000000..20a88fe
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExplainTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.junit.Assert.assertEquals
+import org.junit._
+
+class ExplainTest
+ extends StreamingMultipleProgramsTestBase {
+
+ val testFilePath = this.getClass.getResource("/").getFile
+
+ @Test
+ def testFilter(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table = env.fromElements((1, "hello"))
+ .toTable(tEnv, 'a, 'b)
+ .filter("a % 2 = 0")
+
+ val result = replaceString(tEnv.explain(table))
+
+ val source = scala.io.Source.fromFile(testFilePath +
+ "../../src/test/scala/resources/testFilterStream0.out").mkString
+ val expect = replaceString(source)
+ assertEquals(result, expect)
+ }
+
+ @Test
+ def testUnion(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table = table1.unionAll(table2)
+
+ val result = replaceString(tEnv.explain(table))
+
+ val source = scala.io.Source.fromFile(testFilePath +
+ "../../src/test/scala/resources/testUnionStream0.out").mkString
+ val expect = replaceString(source)
+ assertEquals(result, expect)
+ }
+
+ def replaceString(s: String): String = {
+ /* Stage {id} is ignored, because id keeps incrementing in test class
+ * while StreamExecutionEnvironment is up
+ */
+ s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExpressionReductionTest.scala
new file mode 100644
index 0000000..fec25eb
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/ExpressionReductionTest.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class ExpressionReductionTest extends TableTestBase {
+
+ @Test
+ def testReduceCalcExpression(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table
+ .where('a > (1 + 7))
+ .select((3 + 4).toExpr + 6,
+ (11 === 1) ? ("a", "b"),
+ " STRING ".trim,
+ "test" + "string",
+ "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+ 1.isNull,
+ "TEST".like("%EST"),
+ 2.5.toExpr.floor(),
+ true.cast(Types.STRING) + "X")
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select",
+ "13 AS _c0",
+ "'b' AS _c1",
+ "'STRING' AS _c2",
+ "'teststring' AS _c3",
+ "1990-10-24 23:00:01.123 AS _c4",
+ "false AS _c5",
+ "true AS _c6",
+ "2E0 AS _c7",
+ "'trueX' AS _c8"
+ ),
+ term("where", ">(a, 8)")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testReduceProjectExpression(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table
+ .select((3 + 4).toExpr + 6,
+ (11 === 1) ? ("a", "b"),
+ " STRING ".trim,
+ "test" + "string",
+ "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+ 1.isNull,
+ "TEST".like("%EST"),
+ 2.5.toExpr.floor(),
+ true.cast(Types.STRING) + "X")
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select",
+ "13 AS _c0",
+ "'b' AS _c1",
+ "'STRING' AS _c2",
+ "'teststring' AS _c3",
+ "1990-10-24 23:00:01.123 AS _c4",
+ "false AS _c5",
+ "true AS _c6",
+ "2E0 AS _c7",
+ "'trueX' AS _c8"
+ )
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testReduceFilterExpression(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+ val result = table
+ .where('a > (1 + 7))
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "a", "b", "c"),
+ term("where", ">(a, 8)")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/FieldProjectionTest.scala
new file mode 100644
index 0000000..3bf9c33
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/FieldProjectionTest.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.{Upper, WindowReference}
+import org.apache.flink.table.plan.logical.TumblingGroupWindow
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{TableTestBase, _}
+import org.junit.Test
+
+/**
+ * Tests for all the situations when we can do fields projection. Like selecting few fields
+ * from a large field count source.
+ */
+class FieldProjectionTest extends TableTestBase {
+
+ val streamUtil: StreamTableTestUtil = streamTestUtil()
+
+ @Test
+ def testSelectFromWindow(): Unit = {
+ val sourceTable = streamUtil
+ .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
+ val resultTable = sourceTable
+ .window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w)
+ .select(Upper('c).count, 'a.sum)
+
+ val expected =
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "c", "a", "rowtime", "UPPER(c) AS $f3")
+ ),
+ term("window",
+ TumblingGroupWindow(
+ WindowReference("w"),
+ 'rowtime,
+ 5.millis)),
+ term("select", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1")
+ )
+
+ streamUtil.verifyTable(resultTable, expected)
+ }
+
+ @Test
+ def testSelectFromGroupedWindow(): Unit = {
+ val sourceTable = streamUtil
+ .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
+ val resultTable = sourceTable
+ .window(Tumble over 5.millis on 'rowtime as 'w)
+ .groupBy('w, 'b)
+ .select(Upper('c).count, 'a.sum, 'b)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "c", "a", "b", "rowtime", "UPPER(c) AS $f4")
+ ),
+ term("groupBy", "b"),
+ term("window",
+ TumblingGroupWindow(
+ WindowReference("w"),
+ 'rowtime,
+ 5.millis)),
+ term("select", "b", "COUNT($f4) AS TMP_0", "SUM(a) AS TMP_1")
+ ),
+ term("select", "TMP_0", "TMP_1", "b")
+ )
+
+ streamUtil.verifyTable(resultTable, expected)
+ }
+
+}
+
+
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
deleted file mode 100644
index 4a3524b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupAggregationsITCase.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.common.time.Time
-import org.apache.flink.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData, StreamingWithStateTestBase}
-import org.apache.flink.table.api.{StreamQueryConfig, TableEnvironment}
-import org.apache.flink.table.api.scala.stream.utils.StreamITCase.RetractingSink
-import org.apache.flink.types.Row
-import org.junit.Assert.assertEquals
-import org.junit.Test
-
-import scala.collection.mutable
-
-/**
- * Tests of groupby (without window) aggregations
- */
-class GroupAggregationsITCase extends StreamingWithStateTestBase {
- private val queryConfig = new StreamQueryConfig()
- queryConfig.withIdleStateRetentionTime(Time.hours(1), Time.hours(2))
-
-
- @Test
- def testDistinct(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .select('b).distinct()
-
- val results = t.toRetractStream[Row](queryConfig)
- results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
- env.execute()
-
- val expected = mutable.MutableList("1", "2", "3", "4", "5", "6")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- @Test
- def testDistinctAfterAggregate(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
- .groupBy('e).select('e, 'a.count).distinct()
-
- val results = t.toRetractStream[Row](queryConfig)
- results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
- env.execute()
-
- val expected = mutable.MutableList("1,5", "2,7", "3,3")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- @Test
- def testNonKeyedGroupAggregate(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .select('a.sum, 'b.sum)
-
- val results = t.toRetractStream[Row](queryConfig)
- results.addSink(new StreamITCase.RetractingSink).setParallelism(1)
- env.execute()
-
- val expected = List("231,91")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- @Test
- def testGroupAggregate(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .groupBy('b)
- .select('b, 'a.sum)
-
- val results = t.toRetractStream[Row](queryConfig)
- results.addSink(new StreamITCase.RetractingSink)
- env.execute()
-
- val expected = List("1,1", "2,5", "3,15", "4,34", "5,65", "6,111")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- @Test
- def testDoubleGroupAggregation(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- .groupBy('b)
- .select('a.count as 'cnt, 'b)
- .groupBy('cnt)
- .select('cnt, 'b.count as 'freq)
-
- val results = t.toRetractStream[Row](queryConfig)
-
- results.addSink(new RetractingSink)
- env.execute()
- val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-
- @Test
- def testGroupAggregateWithExpression(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.setStateBackend(getStateBackend)
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamITCase.clear
-
- val t = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
- .groupBy('e, 'b % 3)
- .select('c.min, 'e, 'a.avg, 'd.count)
-
- val results = t.toRetractStream[Row](queryConfig)
- results.addSink(new RetractingSink)
- env.execute()
-
- val expected = mutable.MutableList(
- "0,1,1,1", "7,1,4,2", "2,1,3,2",
- "3,2,3,3", "1,2,3,3", "14,2,5,1",
- "12,3,5,1", "5,3,4,2")
- assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
- }
-}