You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:33 UTC
[04/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
new file mode 100644
index 0000000..f8d9c92
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/UserDefinedTableFunctionTest.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.batch.table
+
+import org.apache.flink.api.java.{DataSet => JDataSet, ExecutionEnvironment => JavaExecutionEnv}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment => ScalaExecutionEnv, _}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.{PojoTableFunc, TableFunc2, _}
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.junit.Test
+import org.mockito.Mockito._
+
+class UserDefinedTableFunctionTest extends TableTestBase {
+
+ @Test
+ def testJavaScalaTableAPIEquality(): Unit = {
+ // mock
+ val ds = mock(classOf[DataSet[Row]])
+ val jDs = mock(classOf[JDataSet[Row]])
+ val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+ when(ds.javaSet).thenReturn(jDs)
+ when(jDs.getType).thenReturn(typeInfo)
+
+ // Scala environment
+ val env = mock(classOf[ScalaExecutionEnv])
+ val tableEnv = TableEnvironment.getTableEnvironment(env)
+ val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
+
+ // Java environment
+ val javaEnv = mock(classOf[JavaExecutionEnv])
+ val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
+ val in2 = javaTableEnv.fromDataSet(jDs).as("a, b, c")
+ javaTableEnv.registerTable("MyTable", in2)
+
+ // test cross join
+ val func1 = new TableFunc1
+ javaTableEnv.registerFunction("func1", func1)
+ var scalaTable = in1.join(func1('c) as 's).select('c, 's)
+ var javaTable = in2.join("func1(c).as(s)").select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test left outer join
+ scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
+ javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test overloading
+ scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
+ javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test custom result type
+ val func2 = new TableFunc2
+ javaTableEnv.registerFunction("func2", func2)
+ scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+ javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test hierarchy generic type
+ val hierarchy = new HierarchyTableFunction
+ javaTableEnv.registerFunction("hierarchy", hierarchy)
+ scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
+ .select('c, 'name, 'len, 'adult)
+ javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
+ .select("c, name, len, adult")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test pojo type
+ val pojo = new PojoTableFunc
+ javaTableEnv.registerFunction("pojo", pojo)
+ scalaTable = in1.join(pojo('c))
+ .select('c, 'name, 'age)
+ javaTable = in2.join("pojo(c)")
+ .select("c, name, age")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test with filter
+ scalaTable = in1.join(func2('c) as ('name, 'len))
+ .select('c, 'name, 'len).filter('len > 2)
+ javaTable = in2.join("func2(c) as (name, len)")
+ .select("c, name, len").filter("len > 2")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test with scalar function
+ scalaTable = in1.join(func1('c.substring(2)) as 's)
+ .select('a, 'c, 's)
+ javaTable = in2.join("func1(substring(c, 2)) as (s)")
+ .select("a, c, s")
+ verifyTableEquals(scalaTable, javaTable)
+ }
+
+ @Test
+ def testCrossJoin(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = util.addFunction("func1", new TableFunc1)
+
+ val result1 = table.join(function('c) as 's).select('c, 's)
+
+ val expected1 = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", s"$function($$2)"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "s")
+ )
+
+ util.verifyTable(result1, expected1)
+
+ // test overloading
+
+ val result2 = table.join(function('c, "$") as 's).select('c, 's)
+
+ val expected2 = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", s"$function($$2, '$$')"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "s")
+ )
+
+ util.verifyTable(result2, expected2)
+ }
+
+ @Test
+ def testLeftOuterJoin(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = util.addFunction("func1", new TableFunc1)
+
+ val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
+
+ val expected = unaryNode(
+ "DataSetCalc",
+ unaryNode(
+ "DataSetCorrelate",
+ batchTableNode(0),
+ term("invocation", s"$function($$2)"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+ term("joinType", "LEFT")
+ ),
+ term("select", "c", "s")
+ )
+
+ util.verifyTable(result, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala
new file mode 100644
index 0000000..ef425d3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/SortTestUtils.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.utils
+
+object SortTestUtils {
+
+ val tupleDataSetStrings = List((1, 1L, "Hi")
+ ,(2, 2L, "Hello")
+ ,(3, 2L, "Hello world")
+ ,(4, 3L, "Hello world, how are you?")
+ ,(5, 3L, "I am fine.")
+ ,(6, 3L, "Luke Skywalker")
+ ,(7, 4L, "Comment#1")
+ ,(8, 4L, "Comment#2")
+ ,(9, 4L, "Comment#3")
+ ,(10, 4L, "Comment#4")
+ ,(11, 5L, "Comment#5")
+ ,(12, 5L, "Comment#6")
+ ,(13, 5L, "Comment#7")
+ ,(14, 5L, "Comment#8")
+ ,(15, 5L, "Comment#9")
+ ,(16, 6L, "Comment#10")
+ ,(17, 6L, "Comment#11")
+ ,(18, 6L, "Comment#12")
+ ,(19, 6L, "Comment#13")
+ ,(20, 6L, "Comment#14")
+ ,(21, 6L, "Comment#15"))
+
+ def sortExpectedly(dataSet: List[Product])
+ (implicit ordering: Ordering[Product]): String =
+ sortExpectedly(dataSet, 0, dataSet.length)
+
+ def sortExpectedly(dataSet: List[Product], start: Int, end: Int)
+ (implicit ordering: Ordering[Product]): String = {
+ dataSet
+ .sorted(ordering)
+ .slice(start, end)
+ .mkString("\n")
+ .replaceAll("[\\(\\)]", "")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
new file mode 100644
index 0000000..45315d6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/TableProgramsTestBase.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.utils
+
+import java.util
+
+import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.{EFFICIENT, NO_NULL, TableConfigMode}
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConversions._
+
+class TableProgramsTestBase(
+ mode: TestExecutionMode,
+ tableConfigMode: TableConfigMode)
+ extends MultipleProgramsTestBase(mode) {
+
+ def config: TableConfig = {
+ val conf = new TableConfig
+ tableConfigMode match {
+ case NO_NULL =>
+ conf.setNullCheck(false)
+ case EFFICIENT =>
+ conf.setEfficientTypeUsage(true)
+ case _ => // keep default
+ }
+ conf
+ }
+}
+
+object TableProgramsTestBase {
+ case class TableConfigMode(nullCheck: Boolean, efficientTypes: Boolean)
+
+ val DEFAULT = TableConfigMode(nullCheck = true, efficientTypes = false)
+ val NO_NULL = TableConfigMode(nullCheck = false, efficientTypes = false)
+ val EFFICIENT = TableConfigMode(nullCheck = false, efficientTypes = true)
+
+ @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+ def parameters(): util.Collection[Array[java.lang.Object]] = {
+ Seq[Array[AnyRef]](Array(TestExecutionMode.COLLECTION, DEFAULT))
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/ExplainStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/ExplainStreamTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/ExplainStreamTest.scala
new file mode 100644
index 0000000..7b168bb
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/ExplainStreamTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.junit.Assert.assertEquals
+import org.junit._
+
+class ExplainStreamTest
+ extends StreamingMultipleProgramsTestBase {
+
+ val testFilePath = ExplainStreamTest.this.getClass.getResource("/").getFile
+
+ @Test
+ def testFilter(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table = env.fromElements((1, "hello"))
+ .toTable(tEnv, 'a, 'b)
+ .filter("a % 2 = 0")
+
+ val result = replaceString(tEnv.explain(table))
+
+ val source = scala.io.Source.fromFile(testFilePath +
+ "../../src/test/scala/resources/testFilterStream0.out").mkString
+ val expect = replaceString(source)
+ assertEquals(result, expect)
+ }
+
+ @Test
+ def testUnion(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+ val table = table1.unionAll(table2)
+
+ val result = replaceString(tEnv.explain(table))
+
+ val source = scala.io.Source.fromFile(testFilePath +
+ "../../src/test/scala/resources/testUnionStream0.out").mkString
+ val expect = replaceString(source)
+ assertEquals(result, expect)
+ }
+
+ def replaceString(s: String): String = {
+ /* Stage {id} is ignored, because id keeps incrementing in test class
+ * while StreamExecutionEnvironment is up
+ */
+ s.replaceAll("\\r\\n", "\n").replaceAll("Stage \\d+", "")
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
new file mode 100644
index 0000000..cdc4329
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSinkITCase.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import java.io.File
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sinks.CsvTableSink
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Test
+
+class TableSinkITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ def testStreamTableSink(): Unit = {
+
+ val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
+ tmpFile.deleteOnExit()
+ val path = tmpFile.toURI.toString
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ env.setParallelism(4)
+
+ val input = StreamTestData.get3TupleDataStream(env)
+ .map(x => x).setParallelism(4) // increase DOP to 4
+
+ val results = input.toTable(tEnv, 'a, 'b, 'c)
+ .where('a < 5 || 'a > 17)
+ .select('c, 'b)
+ .writeToSink(new CsvTableSink(path))
+
+ env.execute()
+
+ val expected = Seq(
+ "Hi,1", "Hello,2", "Hello world,2", "Hello world, how are you?,3",
+ "Comment#12,6", "Comment#13,6", "Comment#14,6", "Comment#15,6").mkString("\n")
+
+ TestBaseUtils.compareResultsByLinesInMemory(expected, path)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
new file mode 100644
index 0000000..ce910db
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceITCase.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream
+
+import java.io.{File, FileOutputStream, OutputStreamWriter}
+
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.sources.{CsvTableSource, StreamTableSource}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class TableSourceITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ def testStreamTableSourceTableAPI(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
+ tEnv.ingest("MyTestTable")
+ .where('amount < 4)
+ .select('amount * 'id, 'name)
+ .toDataStream[Row]
+ .addSink(new StreamITCase.StringSink)
+
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "0,Record_0", "0,Record_16", "0,Record_32",
+ "1,Record_1", "17,Record_17", "36,Record_18",
+ "4,Record_2", "57,Record_19", "9,Record_3")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testStreamTableSourceSQL(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ tEnv.registerTableSource("MyTestTable", new TestStreamTableSource(33))
+ tEnv.sql(
+ "SELECT amount * id, name FROM MyTestTable WHERE amount < 4")
+ .toDataStream[Row]
+ .addSink(new StreamITCase.StringSink)
+
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "0,Record_0", "0,Record_16", "0,Record_32",
+ "1,Record_1", "17,Record_17", "36,Record_18",
+ "4,Record_2", "57,Record_19", "9,Record_3")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testCsvTableSource(): Unit = {
+
+ val csvRecords = Seq(
+ "First#Id#Score#Last",
+ "Mike#1#12.3#Smith",
+ "Bob#2#45.6#Taylor",
+ "Sam#3#7.89#Miller",
+ "Peter#4#0.12#Smith",
+ "% Just a comment",
+ "Liz#5#34.5#Williams",
+ "Sally#6#6.78#Miller",
+ "Alice#7#90.1#Smith",
+ "Kelly#8#2.34#Williams"
+ )
+
+ val tempFile = File.createTempFile("csv-test", "tmp")
+ tempFile.deleteOnExit()
+ val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
+ tmpWriter.write(csvRecords.mkString("$"))
+ tmpWriter.close()
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val csvTable = new CsvTableSource(
+ tempFile.getAbsolutePath,
+ Array("first", "id", "score", "last"),
+ Array(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO
+ ),
+ fieldDelim = "#",
+ rowDelim = "$",
+ ignoreFirstLine = true,
+ ignoreComments = "%"
+ )
+
+ tEnv.registerTableSource("csvTable", csvTable)
+ tEnv.sql(
+ "SELECT last, score, id FROM csvTable WHERE id < 4 ")
+ .toDataStream[Row]
+ .addSink(new StreamITCase.StringSink)
+
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Smith,12.3,1",
+ "Taylor,45.6,2",
+ "Miller,7.89,3")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
+
+
+class TestStreamTableSource(val numRecords: Int) extends StreamTableSource[Row] {
+
+ val fieldTypes: Array[TypeInformation[_]] = Array(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO
+ )
+
+ /** Returns the data of the table as a [[DataStream]]. */
+ override def getDataStream(execEnv: environment.StreamExecutionEnvironment): DataStream[Row] = {
+ execEnv.addSource(new GeneratingSourceFunction(numRecords), getReturnType).setParallelism(1)
+ }
+
+ /** Returns the types of the table fields. */
+ override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
+
+ /** Returns the names of the table fields. */
+ override def getFieldsNames: Array[String] = Array("name", "id", "amount")
+
+ /** Returns the [[TypeInformation]] for the return type. */
+ override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
+
+ /** Returns the number of fields of the table. */
+ override def getNumberOfFields: Int = 3
+}
+
+class GeneratingSourceFunction(val num: Long) extends SourceFunction[Row] {
+
+ var running = true
+
+ override def run(ctx: SourceContext[Row]): Unit = {
+ var cnt = 0L
+ while(running && cnt < num) {
+ val out = new Row(3)
+ out.setField(0, s"Record_$cnt")
+ out.setField(1, cnt)
+ out.setField(2, (cnt % 16).toInt)
+
+ ctx.collect(out)
+ cnt += 1
+ }
+ }
+
+ override def cancel(): Unit = {
+ running = false
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
new file mode 100644
index 0000000..97e76fa
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/SqlITCase.scala
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.junit.Assert._
+import org.junit._
+
+import scala.collection.mutable
+
+class SqlITCase extends StreamingMultipleProgramsTestBase {
+
+ /** test selection **/
+ @Test
+ def testSelectExpressionFromTable(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT a * 2, b - 1 FROM MyTable"
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("2,0", "4,1", "6,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test filtering with registered table **/
+ @Test
+ def testSimpleFilter(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE a = 3"
+
+ val t = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("MyTable", t)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test filtering with registered datastream **/
+ @Test
+ def testDatastreamFilter(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT * FROM MyTable WHERE _1 = 3"
+
+ val t = StreamTestData.getSmall3TupleDataStream(env)
+ tEnv.registerDataStream("MyTable", t)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test union with registered tables **/
+ @Test
+ def testUnion(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT * FROM T1 " +
+ "UNION ALL " +
+ "SELECT * FROM T2"
+
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T1", t1)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1,Hi", "1,1,Hi",
+ "2,2,Hello", "2,2,Hello",
+ "3,2,Hello world", "3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test union with filter **/
+ @Test
+ def testUnionWithFilter(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT * FROM T1 WHERE a = 3 " +
+ "UNION ALL " +
+ "SELECT * FROM T2 WHERE a = 2"
+
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T1", t1)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T2", t2)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "2,2,Hello",
+ "3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ /** test union of a table and a datastream **/
+ @Test
+ def testUnionTableWithDataSet(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val sqlQuery = "SELECT c FROM T1 WHERE a = 3 " +
+ "UNION ALL " +
+ "SELECT c FROM T2 WHERE a = 2"
+
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+ tEnv.registerTable("T1", t1)
+ val t2 = StreamTestData.get3TupleDataStream(env)
+ tEnv.registerDataStream("T2", t2, 'a, 'b, 'c)
+
+ val result = tEnv.sql(sqlQuery).toDataStream[Row]
+ result.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("Hello", "Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
new file mode 100644
index 0000000..58eedd0
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/UserDefinedTableFunctionTest.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.{HierarchyTableFunction, PojoTableFunc, TableFunc2}
+import org.apache.flink.table.utils._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class UserDefinedTableFunctionTest extends TableTestBase {
+
+ @Test
+ def testCrossJoin(): Unit = {
+ val util = streamTestUtil()
+ val func1 = new TableFunc1
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func1", func1)
+
+ val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c)) AS T(s)"
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", "func1($cor0.c)"),
+ term("function", func1.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS s")
+ )
+
+ util.verifySql(sqlQuery, expected)
+
+ // test overloading
+
+ val sqlQuery2 = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(c, '$')) AS T(s)"
+
+ val expected2 = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", "func1($cor0.c, '$')"),
+ term("function", func1.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS s")
+ )
+
+ util.verifySql(sqlQuery2, expected2)
+ }
+
+ @Test
+ def testLeftOuterJoin(): Unit = {
+ val util = streamTestUtil()
+ val func1 = new TableFunc1
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func1", func1)
+
+ val sqlQuery = "SELECT c, s FROM MyTable LEFT JOIN LATERAL TABLE(func1(c)) AS T(s) ON TRUE"
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", "func1($cor0.c)"),
+ term("function", func1.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+ term("joinType", "LEFT")
+ ),
+ term("select", "c", "f0 AS s")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testCustomType(): Unit = {
+ val util = streamTestUtil()
+ val func2 = new TableFunc2
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func2", func2)
+
+ val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len)"
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", "func2($cor0.c)"),
+ term("function", func2.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+ "VARCHAR(2147483647) f0, INTEGER f1)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS name", "f1 AS len")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testHierarchyType(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = new HierarchyTableFunction
+ util.addFunction("hierarchy", function)
+
+ val sqlQuery = "SELECT c, T.* FROM MyTable, LATERAL TABLE(hierarchy(c)) AS T(name, adult, len)"
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", "hierarchy($cor0.c)"),
+ term("function", function.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+ " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testPojoType(): Unit = {
+ val util = streamTestUtil()
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = new PojoTableFunc
+ util.addFunction("pojo", function)
+
+ val sqlQuery = "SELECT c, name, age FROM MyTable, LATERAL TABLE(pojo(c))"
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", "pojo($cor0.c)"),
+ term("function", function.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+ " INTEGER age, VARCHAR(2147483647) name)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "name", "age")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testFilter(): Unit = {
+ val util = streamTestUtil()
+ val func2 = new TableFunc2
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func2", func2)
+
+ val sqlQuery = "SELECT c, name, len FROM MyTable, LATERAL TABLE(func2(c)) AS T(name, len) " +
+ "WHERE len > 2"
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", "func2($cor0.c)"),
+ term("function", func2.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+ "VARCHAR(2147483647) f0, INTEGER f1)"),
+ term("joinType", "INNER"),
+ term("condition", ">($1, 2)")
+ ),
+ term("select", "c", "f0 AS name", "f1 AS len")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testScalarFunction(): Unit = {
+ val util = streamTestUtil()
+ val func1 = new TableFunc1
+ util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ util.addFunction("func1", func1)
+
+ val sqlQuery = "SELECT c, s FROM MyTable, LATERAL TABLE(func1(SUBSTRING(c, 2))) AS T(s)"
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
+ term("function", func1.getClass.getCanonicalName),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) f0)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "f0 AS s")
+ )
+
+ util.verifySql(sqlQuery, expected)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
new file mode 100644
index 0000000..f41cae1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/AggregationsITCase.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.stream.table.GroupWindowITCase.TimestampWithEqualWatermark
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.watermark.Watermark
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.TableEnvironment
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+/**
+ * We only test some aggregations until better testing of constructed DataStream
+ * programs is possible.
+ */
+class AggregationsITCase extends StreamingMultipleProgramsTestBase {
+
+ val data = List(
+ (1L, 1, "Hi"),
+ (2L, 2, "Hello"),
+ (4L, 2, "Hello"),
+ (8L, 3, "Hello world"),
+ (16L, 3, "Hello world"))
+
+ @Test
+ def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Slide over 2.rows every 1.rows)
+ .select('string, 'int.count, 'int.avg)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("Hello world,1,3", "Hello world,2,3", "Hello,1,2", "Hello,2,2", "Hi,1,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSessionGroupWindowOverTime(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Session withGap 7.milli on 'rowtime)
+ .select('string, 'int.count)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("Hello world,1", "Hello world,1", "Hello,2", "Hi,1")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env.fromCollection(data)
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Tumble over 2.rows)
+ .select('int.count)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq("2", "2")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeTumblingWindow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Tumble over 5.milli on 'rowtime as 'w)
+ .select('string, 'int.count, 'int.avg, 'w.start, 'w.end)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "Hello world,1,3,1970-01-01 00:00:00.005,1970-01-01 00:00:00.01",
+ "Hello world,1,3,1970-01-01 00:00:00.015,1970-01-01 00:00:00.02",
+ "Hello,2,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005",
+ "Hi,1,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.005")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testEventTimeSlidingWindow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+ .select('string, 'int.count, 'w.start, 'w.end, 'w.start)
+
+ val results = windowedTable.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "Hello world,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
+ "Hello world,1,1970-01-01 00:00:00.005,1970-01-01 00:00:00.015,1970-01-01 00:00:00.005",
+ "Hello world,1,1970-01-01 00:00:00.01,1970-01-01 00:00:00.02,1970-01-01 00:00:00.01",
+ "Hello world,1,1970-01-01 00:00:00.015,1970-01-01 00:00:00.025,1970-01-01 00:00:00.015",
+ "Hello,2,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
+ "Hello,2,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0",
+ "Hi,1,1969-12-31 23:59:59.995,1970-01-01 00:00:00.005,1969-12-31 23:59:59.995",
+ "Hi,1,1970-01-01 00:00:00.0,1970-01-01 00:00:00.01,1970-01-01 00:00:00.0")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
+
+object GroupWindowITCase {
+ class TimestampWithEqualWatermark extends AssignerWithPunctuatedWatermarks[(Long, Int, String)] {
+
+ override def checkAndGetNextWatermark(
+ lastElement: (Long, Int, String),
+ extractedTimestamp: Long)
+ : Watermark = {
+ new Watermark(extractedTimestamp)
+ }
+
+ override def extractTimestamp(
+ element: (Long, Int, String),
+ previousElementTimestamp: Long): Long = {
+ element._1
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
new file mode 100644
index 0000000..f541eb4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/CalcITCase.scala
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class CalcITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ def testSimpleSelectAll(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
+
+ val results = ds.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1,Hi",
+ "2,2,Hello",
+ "3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testSelectFirst(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
+
+ val results = ds.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("1", "2", "3")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testSimpleSelectWithNaming(): Unit = {
+
+ // verify ProjectMergeRule.
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
+ .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
+ .select('a, 'b)
+
+ val results = ds.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
+ "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
+ "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testSimpleSelectAllWithAs(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ .select('a, 'b, 'c)
+
+ val results = ds.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1,Hi",
+ "2,2,Hello",
+ "3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testAsWithToFewFields(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
+
+ val results = ds.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("no")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testAsWithToManyFields(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
+
+ val results = ds.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("no")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testAsWithAmbiguousFields(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
+
+ val results = ds.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("no")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+
+ @Test(expected = classOf[TableException])
+ def testOnlyFieldRefInAs(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
+
+ val results = ds.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("no")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testSimpleFilter(): Unit = {
+ /*
+ * Test simple filter
+ */
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter('a === 3)
+ val results = filterDs.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testAllRejectingFilter(): Unit = {
+ /*
+ * Test all-rejecting filter
+ */
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( Literal(false) )
+ val results = filterDs.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ assertEquals(true, StreamITCase.testResults.isEmpty)
+ }
+
+ @Test
+ def testAllPassingFilter(): Unit = {
+ /*
+ * Test all-passing filter
+ */
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( Literal(true) )
+ val results = filterDs.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "1,1,Hi",
+ "2,2,Hello",
+ "3,2,Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testFilterOnIntegerTupleField(): Unit = {
+ /*
+ * Test filter on Integer tuple field.
+ */
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( 'a % 2 === 0 )
+ val results = filterDs.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "2,2,Hello", "4,3,Hello world, how are you?",
+ "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
+ "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
+ "18,6,Comment#12", "20,6,Comment#14")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testNotEquals(): Unit = {
+ /*
+ * Test filter on Integer tuple field.
+ */
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+ val filterDs = ds.filter( 'a % 2 !== 0)
+ val results = filterDs.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+ val expected = mutable.MutableList(
+ "1,1,Hi", "3,2,Hello world",
+ "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
+ "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
+ "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
new file mode 100644
index 0000000..ee24cf7
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -0,0 +1,735 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table._
+import org.apache.flink.table.api.{TableException, ValidationException}
+import org.apache.flink.table.expressions.{RowtimeAttribute, WindowReference}
+import org.apache.flink.table.plan.logical._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil.{streamTableNode, term, unaryNode}
+import org.junit.{Ignore, Test}
+
+class GroupWindowTest extends TableTestBase {
+
+ // batch windows are not supported yet
+ @Test(expected = classOf[ValidationException])
+ def testInvalidBatchWindow(): Unit = {
+ val util = batchTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .groupBy('string)
+ .window(Session withGap 100.milli as 'string)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidWindowProperty(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .groupBy('string)
+ .select('string, 'string.start) // property in non windowed table
+ }
+
+ @Test(expected = classOf[TableException])
+ def testInvalidRowtime1(): Unit = {
+ val util = streamTestUtil()
+ // rowtime attribute must not be a field name
+ util.addTable[(Long, Int, String)]('rowtime, 'long, 'int, 'string)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidRowtime2(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .select('string, 'int as 'rowtime) // rowtime attribute must not be an alias
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidRowtime3(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table.as('rowtime, 'myint, 'mystring) // rowtime attribute must not be an alias
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidRowtime4(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .groupBy('string)
+ // only rowtime is a valid time attribute in a stream environment
+ .window(Tumble over 50.milli on 'string)
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidTumblingSize(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .groupBy('string)
+ .window(Tumble over "WRONG") // string is not a valid interval
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidSlidingSize(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .groupBy('string)
+ .window(Slide over "WRONG" every "WRONG") // string is not a valid interval
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidSlidingSlide(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .groupBy('string)
+ .window(Slide over 12.rows every 1.minute) // row and time intervals may not be mixed
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidSessionGap(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .groupBy('string)
+ .window(Session withGap 10.rows) // row interval is not valid for session windows
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidWindowAlias1(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .groupBy('string)
+ .window(Session withGap 100.milli as 1 + 1) // expression instead of a symbol
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidWindowAlias2(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .groupBy('string)
+ .window(Session withGap 100.milli as 'string) // field name "string" is already present
+ .select('string, 'int.count)
+ }
+
+ @Test
+ def testProcessingTimeTumblingGroupWindowOverTime(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Tumble over 50.milli)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int")
+ ),
+ term("groupBy", "string"),
+ term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Tumble over 2.rows)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int")
+ ),
+ term("groupBy", "string"),
+ term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testEventTimeTumblingGroupWindowOverTime(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Tumble over 5.milli on 'rowtime)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int")
+ ),
+ term("groupBy", "string"),
+ term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ @Ignore // see comments in DataStreamAggregate
+ def testEventTimeTumblingGroupWindowOverCount(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Tumble over 2.rows on 'rowtime)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ streamTableNode(0),
+ term("groupBy", "string"),
+ term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testProcessingTimeSlidingGroupWindowOverTime(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Slide over 50.milli every 50.milli)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int")
+ ),
+ term("groupBy", "string"),
+ term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Slide over 2.rows every 1.rows)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int")
+ ),
+ term("groupBy", "string"),
+ term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testEventTimeSlidingGroupWindowOverTime(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Slide over 8.milli every 10.milli on 'rowtime)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int")
+ ),
+ term("groupBy", "string"),
+ term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ @Ignore // see comments in DataStreamAggregate
+ def testEventTimeSlidingGroupWindowOverCount(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Slide over 2.rows every 1.rows on 'rowtime)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ streamTableNode(0),
+ term("groupBy", "string"),
+ term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testEventTimeSessionGroupWindowOverTime(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Session withGap 7.milli on 'rowtime)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int")
+ ),
+ term("groupBy", "string"),
+ term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testAllProcessingTimeTumblingGroupWindowOverTime(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Tumble over 50.milli)
+ .select('string, 'int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int")
+ ),
+ term("groupBy", "string"),
+ term("window", ProcessingTimeTumblingGroupWindow(None, 50.milli)),
+ term("select", "string", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testAllProcessingTimeTumblingGroupWindowOverCount(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Tumble over 2.rows)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "int")
+ ),
+ term("window", ProcessingTimeTumblingGroupWindow(None, 2.rows)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Tumble over 5.milli on 'rowtime)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "int")
+ ),
+ term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 5.milli)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ @Ignore // see comments in DataStreamAggregate
+ def testAllEventTimeTumblingGroupWindowOverCount(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Tumble over 2.rows on 'rowtime)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "int")
+ ),
+ term("window", EventTimeTumblingGroupWindow(None, RowtimeAttribute(), 2.rows)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+
+ @Test
+ def testAllProcessingTimeSlidingGroupWindowOverTime(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Slide over 50.milli every 50.milli)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "int")
+ ),
+ term("window", ProcessingTimeSlidingGroupWindow(None, 50.milli, 50.milli)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testAllProcessingTimeSlidingGroupWindowOverCount(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Slide over 2.rows every 1.rows)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "int")
+ ),
+ term("window", ProcessingTimeSlidingGroupWindow(None, 2.rows, 1.rows)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Slide over 8.milli every 10.milli on 'rowtime)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "int")
+ ),
+ term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 8.milli, 10.milli)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ @Ignore // see comments in DataStreamAggregate
+ def testAllEventTimeSlidingGroupWindowOverCount(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Slide over 2.rows every 1.rows on 'rowtime)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "int")
+ ),
+ term("window", EventTimeSlidingGroupWindow(None, RowtimeAttribute(), 2.rows, 1.rows)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testAllEventTimeSessionGroupWindowOverTime(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .window(Session withGap 7.milli on 'rowtime)
+ .select('int.count)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "int")
+ ),
+ term("window", EventTimeSessionGroupWindow(None, RowtimeAttribute(), 7.milli)),
+ term("select", "COUNT(int) AS TMP_0")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testTumbleWindowStartEnd(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Tumble over 5.milli on 'rowtime as 'w)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int")
+ ),
+ term("groupBy", "string"),
+ term("window",
+ EventTimeTumblingGroupWindow(
+ Some(WindowReference("w")),
+ RowtimeAttribute(),
+ 5.milli)),
+ term("select",
+ "string",
+ "COUNT(int) AS TMP_0",
+ "start(WindowReference(w)) AS TMP_1",
+ "end(WindowReference(w)) AS TMP_2")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testSlideWindowStartEnd(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
+ .select('string, 'int.count, 'w.start, 'w.end)
+
+ val expected = unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int")
+ ),
+ term("groupBy", "string"),
+ term("window",
+ EventTimeSlidingGroupWindow(
+ Some(WindowReference("w")),
+ RowtimeAttribute(),
+ 10.milli,
+ 5.milli)),
+ term("select",
+ "string",
+ "COUNT(int) AS TMP_0",
+ "start(WindowReference(w)) AS TMP_1",
+ "end(WindowReference(w)) AS TMP_2")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testSessionWindowStartWithTwoEnd(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Session withGap 3.milli on 'rowtime as 'w)
+ .select('w.end as 'we1, 'string, 'int.count as 'cnt, 'w.start as 'ws, 'w.end as 'we2)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int")
+ ),
+ term("groupBy", "string"),
+ term("window",
+ EventTimeSessionGroupWindow(
+ Some(WindowReference("w")),
+ RowtimeAttribute(),
+ 3.milli)),
+ term("select",
+ "string",
+ "COUNT(int) AS TMP_1",
+ "end(WindowReference(w)) AS TMP_0",
+ "start(WindowReference(w)) AS TMP_2")
+ ),
+ term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+
+ @Test
+ def testTumbleWindowWithDuplicateAggsAndProps(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ val windowedTable = table
+ .groupBy('string)
+ .window(Tumble over 5.millis on 'rowtime as 'w)
+ .select('string, 'int.sum + 1 as 's1, 'int.sum + 3 as 's2, 'w.start as 'x, 'w.start as 'x2,
+ 'w.end as 'x3, 'w.end)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ streamTableNode(0),
+ term("select", "string", "int")
+ ),
+ term("groupBy", "string"),
+ term("window",
+ EventTimeTumblingGroupWindow(
+ Some(WindowReference("w")),
+ RowtimeAttribute(),
+ 5.millis)),
+ term("select",
+ "string",
+ "SUM(int) AS TMP_0",
+ "start(WindowReference(w)) AS TMP_1",
+ "end(WindowReference(w)) AS TMP_2")
+ ),
+ term("select",
+ "string",
+ "+(CAST(AS(TMP_0, 'TMP_3')), CAST(1)) AS s1",
+ "+(CAST(AS(TMP_0, 'TMP_4')), CAST(3)) AS s2",
+ "TMP_1 AS x",
+ "TMP_1 AS x2",
+ "TMP_2 AS x3",
+ "TMP_2 AS TMP_5")
+ )
+
+ util.verifyTable(windowedTable, expected)
+ }
+}