You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:18 UTC
[09/44] flink git commit: [FLINK-6617][table] Improve JAVA and SCALA
logical plans consistent test
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
deleted file mode 100644
index 3d1704e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.types.Row
-import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.table.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class UnionITCase extends StreamingMultipleProgramsTestBase {
-
- @Test
- def testUnion(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f)
-
- val unionDs = ds1.unionAll(ds2).select('c)
-
- val results = unionDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList(
- "Hi", "Hello", "Hello world", "Hi", "Hello", "Hello world")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test
- def testUnionWithFilter(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
- val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
-
- val results = unionDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- val expected = mutable.MutableList("Hi", "Hallo")
- assertEquals(expected.sorted, StreamITCase.testResults.sorted)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnionFieldsNameNotOverlap1(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
- val unionDs = ds1.unionAll(ds2)
-
- val results = unionDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- assertEquals(true, StreamITCase.testResults.isEmpty)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnionFieldsNameNotOverlap2(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- StreamITCase.testResults = mutable.MutableList()
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
- val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
- .select('a, 'b, 'c)
-
- val unionDs = ds1.unionAll(ds2)
-
- val results = unionDs.toAppendStream[Row]
- results.addSink(new StreamITCase.StringSink[Row])
- env.execute()
-
- assertEquals(true, StreamITCase.testResults.isEmpty)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnionTablesFromDifferentEnvs(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv1 = TableEnvironment.getTableEnvironment(env)
- val tEnv2 = TableEnvironment.getTableEnvironment(env)
-
- val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv1, 'a, 'b, 'c)
- val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv2, 'a, 'b, 'c)
-
- // Must fail. Tables are bound to different TableEnvironments.
- ds1.unionAll(ds2)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
deleted file mode 100644
index 0e33d8b..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.stream.table
-
-import org.apache.flink.table.api.scala.stream.utils.StreamTestData
-import org.apache.flink.table.api.scala._
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
-import org.junit.Test
-
-class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
-
- @Test(expected = classOf[ValidationException])
- def testSort(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).orderBy('_1.desc)
- }
-
- @Test(expected = classOf[ValidationException])
- def testJoin(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.join(t2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testUnion(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.union(t2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testIntersect(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.intersect(t2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testIntersectAll(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.intersectAll(t2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testMinus(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.minus(t2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testMinusAll(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.minusAll(t2)
- }
-
- @Test(expected = classOf[ValidationException])
- def testLimit(): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
- val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
- t1.limit(0,5)
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
index 3edfd8c..bb9877b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
@@ -18,257 +18,14 @@
package org.apache.flink.table.api.scala.stream.table
import org.apache.flink.api.scala._
-import org.apache.flink.types.Row
-import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api._
-import org.apache.flink.table.expressions.utils._
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils._
-import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
-import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaExecutionEnv}
-import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment => ScalaExecutionEnv}
-import org.junit.Assert.{assertTrue, fail}
import org.junit.Test
-import org.mockito.Mockito._
class UserDefinedTableFunctionTest extends TableTestBase {
@Test
- def testJavaScalaTableAPIEquality(): Unit = {
- // mock
- val ds = mock(classOf[DataStream[Row]])
- val jDs = mock(classOf[JDataStream[Row]])
- val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
- when(ds.javaStream).thenReturn(jDs)
- when(jDs.getType).thenReturn(typeInfo)
-
- // Scala environment
- val env = mock(classOf[ScalaExecutionEnv])
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
- // Java environment
- val javaEnv = mock(classOf[JavaExecutionEnv])
- val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
- val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
-
- // test cross join
- val func1 = new TableFunc1
- javaTableEnv.registerFunction("func1", func1)
- var scalaTable = in1.join(func1('c) as 's).select('c, 's)
- var javaTable = in2.join(new Table(javaTableEnv, "func1(c).as(s)")).select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test left outer join
- scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
- javaTable = in2.leftOuterJoin(new Table(javaTableEnv, "func1(c)").as("s")).select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test overloading
- scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
- javaTable = in2.join(new Table(javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // test custom result type
- val func2 = new TableFunc2
- javaTableEnv.registerFunction("func2", func2)
- scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
- javaTable = in2.join(new Table(javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
- verifyTableEquals(scalaTable, javaTable)
-
- // test hierarchy generic type
- val hierarchy = new HierarchyTableFunction
- javaTableEnv.registerFunction("hierarchy", hierarchy)
- scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
- .select('c, 'name, 'len, 'adult)
- javaTable = in2.join(new Table(javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
- .select("c, name, len, adult")
- verifyTableEquals(scalaTable, javaTable)
-
- // test pojo type
- val pojo = new PojoTableFunc
- javaTableEnv.registerFunction("pojo", pojo)
- scalaTable = in1.join(pojo('c))
- .select('c, 'name, 'age)
- javaTable = in2.join(new Table(javaTableEnv, "pojo(c)"))
- .select("c, name, age")
- verifyTableEquals(scalaTable, javaTable)
-
- // test with filter
- scalaTable = in1.join(func2('c) as ('name, 'len))
- .select('c, 'name, 'len).filter('len > 2)
- javaTable = in2.join(new Table(javaTableEnv, "func2(c) as (name, len)"))
- .select("c, name, len").filter("len > 2")
- verifyTableEquals(scalaTable, javaTable)
-
- // test with scalar function
- scalaTable = in1.join(func1('c.substring(2)) as 's)
- .select('a, 'c, 's)
- javaTable = in2.join(new Table(javaTableEnv, "func1(substring(c, 2)) as (s)"))
- .select("a, c, s")
- verifyTableEquals(scalaTable, javaTable)
-
- // check scala object is forbidden
- expectExceptionThrown(
- tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
- expectExceptionThrown(
- javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
- expectExceptionThrown(in1.join(ObjectTableFunction('a, 1)), "Scala object")
-
- }
-
- @Test
- def testInvalidTableFunctions(): Unit = {
- // mock
- val ds = mock(classOf[DataStream[Row]])
- val jDs = mock(classOf[JDataStream[Row]])
- val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
- when(ds.javaStream).thenReturn(jDs)
- when(jDs.getType).thenReturn(typeInfo)
-
- // Scala environment
- val env = mock(classOf[ScalaExecutionEnv])
- val tableEnv = TableEnvironment.getTableEnvironment(env)
- val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
-
- // Java environment
- val javaEnv = mock(classOf[JavaExecutionEnv])
- val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
- val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
-
- val func1 = new TableFunc1
- javaTableEnv.registerFunction("func1", func1)
-
- // table function call select
- expectExceptionThrown(
- func1('c).select("f0"),
- "TableFunction can only be used in join and leftOuterJoin."
- )
-
- // table function call select
- expectExceptionThrown(
- func1('c).select('f0),
- "TableFunction can only be used in join and leftOuterJoin."
- )
-
- // table function call writeToSink
- expectExceptionThrown(
- func1('c).writeToSink(null),
- "Cannot translate a query with an unbounded table function call."
- )
-
- // table function call distinct
- expectExceptionThrown(
- func1('c).distinct(),
- "TableFunction can only be used in join and leftOuterJoin."
- )
-
- // table function call filter
- expectExceptionThrown(
- func1('c).filter('f0 === "?"),
- "TableFunction can only be used in join and leftOuterJoin."
- )
-
- // table function call filter
- expectExceptionThrown(
- func1('c).filter("f0 = '?'"),
- "TableFunction can only be used in join and leftOuterJoin."
- )
-
- // table function call limit
- expectExceptionThrown(
- func1('c).orderBy('f0).limit(3),
- "TableFunction can only be used in join and leftOuterJoin."
- )
-
- // table function call limit
- expectExceptionThrown(
- func1('c).orderBy('f0).limit(0, 3),
- "TableFunction can only be used in join and leftOuterJoin."
- )
-
- // table function call orderBy
- expectExceptionThrown(
- func1('c).orderBy("f0"),
- "TableFunction can only be used in join and leftOuterJoin."
- )
-
- // table function call orderBy
- expectExceptionThrown(
- func1('c).orderBy('f0),
- "TableFunction can only be used in join and leftOuterJoin."
- )
-
- // table function call where
- expectExceptionThrown(
- func1('c).where("f0 = '?'"),
- "TableFunction can only be used in join and leftOuterJoin."
- )
-
- // table function call where
- expectExceptionThrown(
- func1('c).where('f0 === "?"),
- "TableFunction can only be used in join and leftOuterJoin."
- )
-
- }
-
- @Test
- def testInvalidTableFunction(): Unit = {
- // mock
- val util = streamTestUtil()
- val t = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
- val tEnv = TableEnvironment.getTableEnvironment(mock(classOf[JavaExecutionEnv]))
-
- //=================== check scala object is forbidden =====================
- // Scala table environment register
- expectExceptionThrown(util.addFunction("udtf", ObjectTableFunction), "Scala object")
- // Java table environment register
- expectExceptionThrown(tEnv.registerFunction("udtf", ObjectTableFunction), "Scala object")
- // Scala Table API directly call
- expectExceptionThrown(t.join(ObjectTableFunction('a, 1)), "Scala object")
-
-
- //============ throw exception when table function is not registered =========
- // Java Table API call
- expectExceptionThrown(
- t.join(new Table(util.tEnv, "nonexist(a)")
- ), "Undefined function: NONEXIST")
- // SQL API call
- expectExceptionThrown(
- util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
- "No match found for function signature nonexist(<NUMERIC>)")
-
-
- //========= throw exception when the called function is a scalar function ====
- util.tEnv.registerFunction("func0", Func0)
-
- // Java Table API call
- expectExceptionThrown(
- t.join(new Table(util.tEnv, "func0(a)")),
- "only accept String that define table function",
- classOf[TableException])
- // SQL API call
- // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
- expectExceptionThrown(
- util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
- null,
- classOf[AssertionError])
-
- //========== throw exception when the parameters is not correct ===============
- // Java Table API call
- util.addFunction("func2", new TableFunc2)
- expectExceptionThrown(
- t.join(new Table(util.tEnv, "func2(c, c)")),
- "Given parameters of function 'FUNC2' do not match any signature")
- // SQL API call
- expectExceptionThrown(
- util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
- "No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
- }
-
- @Test
def testCrossJoin(): Unit = {
val util = streamTestUtil()
val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
@@ -459,25 +216,4 @@ class UserDefinedTableFunctionTest extends TableTestBase {
util.verifyTable(result, expected)
}
- // ----------------------------------------------------------------------------------------------
-
- private def expectExceptionThrown(
- function: => Unit,
- keywords: String,
- clazz: Class[_ <: Throwable] = classOf[ValidationException])
- : Unit = {
- try {
- function
- fail(s"Expected a $clazz, but no exception is thrown.")
- } catch {
- case e if e.getClass == clazz =>
- if (keywords != null) {
- assertTrue(
- s"The exception message '${e.getMessage}' doesn't contain keyword '$keywords'",
- e.getMessage.contains(keywords))
- }
- case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
- }
- }
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/CalcStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/CalcStringExpressionTest.scala
new file mode 100644
index 0000000..b558193
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/CalcStringExpressionTest.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class CalcStringExpressionTest extends TableTestBase {
+
+ @Test
+ def testSimpleSelect(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]()
+
+ val resScala = t.select('_1, '_2)
+ val resJava = t.select("_1, _2")
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testSelectStar(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+ val resScala = t.select('*)
+ val resJava = t.select("*")
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testSelectWithWhere(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+ val resScala = t.where('string === "true").select('int)
+ val resJava = t.where("string === 'true'").select("int")
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testSimpleSelectWithNaming(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+ val resScala = t.select('int, 'string)
+ val resJava = t.select("int, string")
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testSimpleSelectWithAlias(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+ val resScala = t.select('int as 'myInt, 'string as 'myString)
+ val resJava = t.select("int as myInt, string as myString")
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testSimpleFilter(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+ val resScala = t.filter('int === 3).select('int as 'myInt, 'string)
+ val resJava = t.filter("int === 3").select("int as myInt, string")
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testAllRejectingFilter(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+ val resScala = t.filter(Literal(false)).select('int as 'myInt, 'string)
+ val resJava = t.filter("false").select("int as myInt, string")
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testAllPassingFilter(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+ val resScala = t.filter(Literal(true)).select('int as 'myInt, 'string)
+ val resJava = t.filter("true").select("int as myInt, string")
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testNotEqualsFilter(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+ val resScala = t.filter('int !== 2).filter('string.like("%world%")).select('int, 'string)
+ val resJava = t.filter("int !== 2").filter("string.like('%world%')").select("int, string")
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testFilterWithExpression(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+ val resScala = t.filter('int % 2 === 0).select('int, 'string)
+ val resJava = t.filter("int % 2 === 0").select("int, string")
+ verifyTableEquals(resJava, resScala)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupAggregationsStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupAggregationsStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupAggregationsStringExpressionTest.scala
new file mode 100644
index 0000000..fb23f09
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupAggregationsStringExpressionTest.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvg
+import org.junit.Test
+
+class GroupAggregationsStringExpressionTest extends TableTestBase {
+
+ @Test
+ def testGroupedAggregate(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+ // Expression / Scala API
+ val resScala = t
+ .groupBy('string)
+ .select('int.count as 'cnt, weightAvgFun('long, 'int))
+
+ // String / Java API
+ val resJava = t
+ .groupBy("string")
+ .select("int.count as cnt, weightAvgFun(long, int)")
+
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testNonGroupedAggregate(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+ // Expression / Scala API
+ val resScala = t.select('int.count as 'cnt, 'long.sum)
+
+ // String / Java API
+ val resJava = t.select("int.count as cnt, long.sum")
+
+ verifyTableEquals(resJava, resScala)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
index 1cc156e..bb0b121 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/GroupWindowStringExpressionTest.scala
@@ -18,9 +18,9 @@
package org.apache.flink.table.api.scala.stream.table.stringexpr
import org.apache.flink.api.scala._
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvg
-import org.apache.flink.table.api.java.{Slide => JSlide}
import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvg
+import org.apache.flink.table.api.java.{Slide => JSlide, Session => JSession, Tumble => JTumble}
import org.apache.flink.table.functions.aggfunctions.CountAggFunction
import org.apache.flink.table.utils.TableTestBase
import org.junit.Test
@@ -29,14 +29,14 @@ import org.junit.Test
class GroupWindowStringExpressionTest extends TableTestBase {
@Test
- def testJavaScalaTableAPIEquality(): Unit = {
+ def testRowTimeSlide(): Unit = {
val util = streamTestUtil()
val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'rowtime.rowtime)
val myCountFun = new CountAggFunction
- util.tEnv.registerFunction("myCountFun", myCountFun)
+ util.tableEnv.registerFunction("myCountFun", myCountFun)
val weightAvgFun = new WeightedAvg
- util.tEnv.registerFunction("weightAvgFun", weightAvgFun)
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
// Expression / Scala API
val resScala = t
@@ -66,4 +66,193 @@ class GroupWindowStringExpressionTest extends TableTestBase {
verifyTableEquals(resJava, resScala)
}
+
+ @Test
+ def testRowTimeTumble(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, Long, String)]('int, 'long, 'rowtime.rowtime, 'string)
+
+ val myCountFun = new CountAggFunction
+ util.tableEnv.registerFunction("myCountFun", myCountFun)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+ // Expression / Scala API
+ val resScala = t
+ .window(Tumble over 4.hours on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select(
+ 'string,
+ myCountFun('string),
+ 'int.sum,
+ weightAvgFun('long, 'int),
+ weightAvgFun('int, 'int) * 2,
+ 'w.start,
+ 'w.end)
+
+ // String / Java API
+ val resJava = t
+ .window(JTumble.over("4.hours").on("rowtime").as("w"))
+ .groupBy("w, string")
+ .select(
+ "string, " +
+ "myCountFun(string), " +
+ "int.sum, " +
+ "weightAvgFun(long, int), " +
+ "weightAvgFun(int, int) * 2, " +
+ "start(w)," +
+ "end(w)")
+
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testRowTimeSession(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'rowtime.rowtime)
+
+ val myCountFun = new CountAggFunction
+ util.tableEnv.registerFunction("myCountFun", myCountFun)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+ // Expression / Scala API
+ val resScala = t
+ .window(Session withGap 4.hours on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select(
+ 'string,
+ myCountFun('string),
+ 'int.sum,
+ weightAvgFun('long, 'int),
+ weightAvgFun('int, 'int) * 2,
+ 'w.start)
+
+ // String / Java API
+ val resJava = t
+ .window(JSession.withGap("4.hours").on("rowtime").as("w"))
+ .groupBy("w, string")
+ .select(
+ "string, " +
+ "myCountFun(string), " +
+ "int.sum, " +
+ "weightAvgFun(long, int), " +
+ "weightAvgFun(int, int) * 2, " +
+ "start(w)"
+ )
+
+ verifyTableEquals(resJava, resScala)
+ }
+ @Test
+ def testProcTimeSlide(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'proctime.proctime)
+
+ val myCountFun = new CountAggFunction
+ util.tableEnv.registerFunction("myCountFun", myCountFun)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+ // Expression / Scala API
+ val resScala = t
+ .window(Slide over 4.hours every 2.hours on 'proctime as 'w)
+ .groupBy('w)
+ .select(
+ myCountFun('string),
+ 'int.sum,
+ weightAvgFun('long, 'int),
+ weightAvgFun('int, 'int) * 2,
+ 'w.start,
+ 'w.end)
+
+ // String / Java API
+ val resJava = t
+ .window(JSlide.over("4.hours").every("2.hours").on("proctime").as("w"))
+ .groupBy("w")
+ .select(
+ "myCountFun(string), " +
+ "int.sum, " +
+ "weightAvgFun(long, int), " +
+ "weightAvgFun(int, int) * 2, " +
+ "start(w)," +
+ "end(w)")
+
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testProcTimeTumble(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long,'string, 'proctime.proctime)
+
+ val myCountFun = new CountAggFunction
+ util.tableEnv.registerFunction("myCountFun", myCountFun)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+ // Expression / Scala API
+ val resScala = t
+ .window(Tumble over 4.hours on 'proctime as 'w)
+ .groupBy('w)
+ .select(
+ myCountFun('string),
+ 'int.sum,
+ weightAvgFun('long, 'int),
+ weightAvgFun('int, 'int) * 2,
+ 'w.start,
+ 'w.end)
+
+ // String / Java API
+ val resJava = t
+ .window(JTumble.over("4.hours").on("proctime").as("w"))
+ .groupBy("w")
+ .select(
+ "myCountFun(string), " +
+ "int.sum, " +
+ "weightAvgFun(long, int), " +
+ "weightAvgFun(int, int) * 2, " +
+ "start(w)," +
+ "end(w)")
+
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testProcTimeSession(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]('int, 'long, 'string, 'proctime.proctime)
+
+ val myCountFun = new CountAggFunction
+ util.tableEnv.registerFunction("myCountFun", myCountFun)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
+ // Expression / Scala API
+ val resScala = t
+ .window(Session withGap 4.hours on 'proctime as 'w)
+ .groupBy('w)
+ .select(
+ myCountFun('string),
+ 'int.sum,
+ weightAvgFun('long, 'int),
+ weightAvgFun('int, 'int) * 2,
+ 'w.start,
+ 'w.end)
+
+ // String / Java API
+ val resJava = t
+ .window(JSession.withGap("4.hours").on("proctime").as("w"))
+ .groupBy("w")
+ .select(
+ "myCountFun(string), " +
+ "int.sum, " +
+ "weightAvgFun(long, int), " +
+ "weightAvgFun(int, int) * 2, " +
+ "start(w), " +
+ "end(w)"
+ )
+
+ verifyTableEquals(resJava, resScala)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
index 4c95916..9430514 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/OverWindowStringExpressionTest.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.scala.stream.table.stringexpr
import org.apache.flink.api.scala._
import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithRetract
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvg
import org.apache.flink.table.api.java.{Over => JOver}
import org.apache.flink.table.api.scala.{Over => SOver, _}
import org.apache.flink.table.expressions.utils.Func1
@@ -33,12 +34,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
val util = streamTestUtil()
val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
val resScala = t
.window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)
- .select('a, 'b.sum over 'w)
+ .select('a, 'b.sum over 'w as 'cnt, weightAvgFun('a, 'b) over 'w as 'myCnt)
val resJava = t
.window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"))
- .select("a, SUM(b) OVER w")
+ .select("a, SUM(b) OVER w as cnt, weightAvgFun(a, b) over w as myCnt")
verifyTableEquals(resScala, resJava)
}
@@ -48,12 +52,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
val util = streamTestUtil()
val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
val resScala = t
.window(SOver orderBy 'rowtime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w)
- .select('a, 'b.sum over 'w)
+ .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
val resJava = t
.window(JOver.orderBy("rowtime").preceding("unbounded_row").following("current_row").as("w"))
- .select("a, SUM(b) OVER w")
+ .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
verifyTableEquals(resScala, resJava)
}
@@ -63,12 +70,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
val util = streamTestUtil()
val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
val resScala = t
.window(SOver partitionBy('a, 'd) orderBy 'rowtime preceding 10.rows as 'w)
- .select('a, 'b.sum over 'w)
+ .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
val resJava = t
.window(JOver.partitionBy("a, d").orderBy("rowtime").preceding("10.rows").as("w"))
- .select("a, SUM(b) OVER w")
+ .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
verifyTableEquals(resScala, resJava)
}
@@ -78,12 +88,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
val util = streamTestUtil()
val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
val resScala = t
.window(SOver orderBy 'rowtime preceding 10.rows following CURRENT_ROW as 'w)
- .select('a, 'b.sum over 'w)
+ .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
val resJava = t
.window(JOver.orderBy("rowtime").preceding("10.rows").following("current_row").as("w"))
- .select("a, SUM(b) OVER w")
+ .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
verifyTableEquals(resScala, resJava)
}
@@ -93,12 +106,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
val util = streamTestUtil()
val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
val resScala = t
.window(SOver partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)
- .select('a, 'b.sum over 'w)
+ .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
val resJava = t
.window(JOver.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"))
- .select("a, SUM(b) OVER w")
+ .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
verifyTableEquals(resScala, resJava)
}
@@ -108,13 +124,16 @@ class OverWindowStringExpressionTest extends TableTestBase {
val util = streamTestUtil()
val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
val resScala = t
.window(SOver orderBy 'rowtime preceding UNBOUNDED_RANGE following CURRENT_RANGE as 'w)
- .select('a, 'b.sum over 'w)
+ .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
val resJava = t
.window(
JOver.orderBy("rowtime").preceding("unbounded_range").following("current_range").as("w"))
- .select("a, SUM(b) OVER w")
+ .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
verifyTableEquals(resScala, resJava)
}
@@ -124,12 +143,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
val util = streamTestUtil()
val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
val resScala = t
.window(SOver partitionBy('a, 'c) orderBy 'rowtime preceding 10.minutes as 'w)
- .select('a, 'b.sum over 'w)
+ .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
val resJava = t
.window(JOver.partitionBy("a, c").orderBy("rowtime").preceding("10.minutes").as("w"))
- .select("a, SUM(b) OVER w")
+ .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
verifyTableEquals(resScala, resJava)
}
@@ -139,12 +161,15 @@ class OverWindowStringExpressionTest extends TableTestBase {
val util = streamTestUtil()
val t = util.addTable[(Long, Int, String, Int, Long)]('a, 'b, 'c, 'd, 'e, 'rowtime.rowtime)
+ val weightAvgFun = new WeightedAvg
+ util.tableEnv.registerFunction("weightAvgFun", weightAvgFun)
+
val resScala = t
.window(SOver orderBy 'rowtime preceding 4.hours following CURRENT_RANGE as 'w)
- .select('a, 'b.sum over 'w)
+ .select('a, 'b.sum over 'w, weightAvgFun('a, 'b) over 'w as 'myCnt)
val resJava = t
.window(JOver.orderBy("rowtime").preceding("4.hours").following("current_range").as("w"))
- .select("a, SUM(b) OVER w")
+ .select("a, SUM(b) OVER w, weightAvgFun(a, b) over w as myCnt")
verifyTableEquals(resScala, resJava)
}
@@ -181,6 +206,4 @@ class OverWindowStringExpressionTest extends TableTestBase {
verifyTableEquals(resScala, resJava)
}
-
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UnionStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UnionStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UnionStringExpressionTest.scala
new file mode 100644
index 0000000..dc2e92f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UnionStringExpressionTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class UnionStringExpressionTest extends TableTestBase {
+
+ @Test
+ def testUnionAll(): Unit = {
+ val util = streamTestUtil()
+ val t1 = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+ val t2 = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+
+ val resScala = t1.unionAll(t2).select('int)
+ val resJava = t1.unionAll(t2).select("int")
+ verifyTableEquals(resJava, resScala)
+ }
+
+ @Test
+ def testUnionAllWithFilter(): Unit = {
+ val util = streamTestUtil()
+ val t1 = util.addTable[(Int, Long, String)]('int, 'long, 'string)
+ val t2 = util.addTable[(Int, Long, Double, String)]('int, 'long, 'double, 'string)
+
+ val resScala = t1.unionAll(t2.select('int, 'long, 'string)).filter('int < 2).select('int)
+ val resJava = t1.unionAll(t2.select("int, long, string")).filter("int < 2").select("int")
+ verifyTableEquals(resJava, resScala)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
new file mode 100644
index 0000000..2f0c57c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/stringexpr/UserDefinedTableFunctionStringExpressionTest.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.stringexpr
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api._
+import org.apache.flink.table.utils._
+import org.apache.flink.types.Row
+import org.junit.Test
+
+class UserDefinedTableFunctionStringExpressionTest extends TableTestBase {
+
+ @Test
+ def testJoin(): Unit = {
+
+ val util = streamTestUtil()
+ val sTab = util.addTable[(Int, Long, String)]('a, 'b, 'c)
+ val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+ val jTab = util.addJavaTable[Row](typeInfo,"MyTab","a, b, c")
+
+ // test cross join
+ val func1 = new TableFunc1
+ util.javaTableEnv.registerFunction("func1", func1)
+ var scalaTable = sTab.join(func1('c) as 's).select('c, 's)
+ var javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c).as(s)")).select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test left outer join
+ scalaTable = sTab.leftOuterJoin(func1('c) as 's).select('c, 's)
+ javaTable = jTab.leftOuterJoin(new Table(util.javaTableEnv, "func1(c)").as("s")).select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test overloading
+ scalaTable = sTab.join(func1('c, "$") as 's).select('c, 's)
+ javaTable = jTab.join(new Table(util.javaTableEnv, "func1(c, '$') as (s)")).select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test custom result type
+ val func2 = new TableFunc2
+ util.javaTableEnv.registerFunction("func2", func2)
+ scalaTable = sTab.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+ javaTable = jTab.join(
+ new Table(util.javaTableEnv, "func2(c).as(name, len)")).select("c, name, len")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test hierarchy generic type
+ val hierarchy = new HierarchyTableFunction
+ util.javaTableEnv.registerFunction("hierarchy", hierarchy)
+ scalaTable = sTab.join(hierarchy('c) as ('name, 'adult, 'len)).select('c, 'name, 'len, 'adult)
+ javaTable = jTab.join(new Table(util.javaTableEnv, "AS(hierarchy(c), name, adult, len)"))
+ .select("c, name, len, adult")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test pojo type
+ val pojo = new PojoTableFunc
+ util.javaTableEnv.registerFunction("pojo", pojo)
+ scalaTable = sTab.join(pojo('c)).select('c, 'name, 'age)
+ javaTable = jTab.join(new Table(util.javaTableEnv, "pojo(c)")).select("c, name, age")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test with filter
+ scalaTable = sTab.join(func2('c) as ('name, 'len)).select('c, 'name, 'len).filter('len > 2)
+ javaTable = jTab.join(new Table(util.javaTableEnv, "func2(c) as (name, len)"))
+ .select("c, name, len").filter("len > 2")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test with scalar function
+ scalaTable = sTab.join(func1('c.substring(2)) as 's).select('a, 'c, 's)
+ javaTable = jTab.join(
+ new Table(util.javaTableEnv, "func1(substring(c, 2)) as (s)")).select("a, c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/CalcValidationTest.scala
new file mode 100644
index 0000000..879fca3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/CalcValidationTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.runtime.datastream.StreamITCase
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.api.TableException
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.mutable
+
+class CalcValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[TableException])
+ def testAsWithToManyFields(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
+
+ val results = ds.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList("no")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testAsWithAmbiguousFields(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
+
+ val results = ds.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList("no")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testOnlyFieldRefInAs(): Unit = {
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+ val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
+
+ val results = ds.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ val expected = mutable.MutableList("no")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupAggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupAggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupAggregationsValidationTest.scala
new file mode 100644
index 0000000..5cec6fc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupAggregationsValidationTest.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class GroupAggregationsValidationTest extends TableTestBase {
+
+ /**
+ * OVER clause is necessary for [[OverAgg0]] window function.
+ */
+ @Test(expected = classOf[ValidationException])
+ def testOverAggregation(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+ val overAgg = new OverAgg0
+ table.select(overAgg('a, 'b))
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testGroupingOnNonExistentField(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val ds = table
+ // must fail. '_foo is not a valid field
+ .groupBy('_foo)
+ .select('a.avg)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testGroupingInvalidSelection(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('a, 'b, 'c)
+
+ val ds = table
+ .groupBy('a, 'b)
+ // must fail. 'c is not a grouping key or aggregation
+ .select('c)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupWindowAggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupWindowAggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupWindowAggregationsValidationTest.scala
new file mode 100644
index 0000000..e046986
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/GroupWindowAggregationsValidationTest.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class GroupWindowAggregationsValidationTest extends TableTestBase {
+
+ /**
+ * OVER clause is necessary for [[OverAgg0]] window function.
+ */
+ @Test(expected = classOf[ValidationException])
+ def testOverAggregation(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+ val overAgg = new OverAgg0
+ table
+ .window(Tumble over 2.rows on 'proctime as 'w)
+ .groupBy('w, 'string)
+ .select(overAgg('long, 'int))
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidWindowProperty(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .groupBy('string)
+ .select('string, 'string.start) // property in non windowed table
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testGroupByWithoutWindowAlias(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+ table
+ .window(Tumble over 5.milli on 'long as 'w)
+ .groupBy('string)
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidRowTimeRef(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+ table
+ .window(Tumble over 5.milli on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+ .window(Slide over 5.milli every 1.milli on 'int as 'w2) // 'Int does not exist in input.
+ .groupBy('w2)
+ .select('string)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidTumblingSize(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+ table
+ .window(Tumble over "WRONG" on 'long as 'w) // string is not a valid interval
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testTumbleUdAggWithInvalidArgs(): Unit = {
+ val util = streamTestUtil()
+ val weightedAvg = new WeightedAvgWithMerge
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .window(Tumble over 2.hours on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, weightedAvg('string, 'int)) // invalid UDAGG args
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidSlidingSize(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+ table
+ .window(Slide over "WRONG" every "WRONG" on 'long as 'w) // string is not a valid interval
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidSlidingSlide(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+ table
+ // row and time intervals may not be mixed
+ .window(Slide over 12.rows every 1.minute on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testSlideUdAggWithInvalidArgs(): Unit = {
+ val util = streamTestUtil()
+ val weightedAvg = new WeightedAvgWithMerge
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
+
+ table
+ .window(Slide over 2.hours every 30.minutes on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, weightedAvg('string, 'int)) // invalid UDAGG args
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidSessionGap(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+ table
+ // row interval is not valid for session windows
+ .window(Session withGap 10.rows on 'long as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidWindowAlias1(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+ table
+ .window(Session withGap 100.milli on 'long as 1 + 1) // expression instead of a symbol
+ .groupBy('string)
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidWindowAlias2(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long.rowtime, 'int, 'string)
+
+ table
+ // field name "string" is already present
+ .window(Session withGap 100.milli on 'long as 'string)
+ .groupBy('string)
+ .select('string, 'int.count)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testSessionUdAggWithInvalidArgs(): Unit = {
+ val util = streamTestUtil()
+ val weightedAvg = new WeightedAvgWithMerge
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
+
+ table
+ .window(Session withGap 2.hours on 'rowtime as 'w)
+ .groupBy('w, 'string)
+ .select('string, weightedAvg('string, 'int)) // invalid UDAGG args
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidWindowPropertyOnRowCountsTumblingWindow(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+ table
+ .window(Tumble over 2.rows on 'proctime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'w.start, 'w.end) // invalid start/end on rows-count window
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidWindowPropertyOnRowCountsSlidingWindow(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
+
+ table
+ .window(Slide over 10.rows every 5.rows on 'proctime as 'w)
+ .groupBy('w, 'string)
+ .select('string, 'w.start, 'w.end) // invalid start/end on rows-count window
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/OverWindowValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/OverWindowValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/OverWindowValidationTest.scala
new file mode 100644
index 0000000..cb84ae4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/OverWindowValidationTest.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithRetract
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{Table, ValidationException}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.junit.Test
+
+class OverWindowValidationTest extends TableTestBase {
+ private val streamUtil: StreamTableTestUtil = streamTestUtil()
+ val table: Table = streamUtil.addTable[(Int, String, Long)]("MyTable",
+ 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidWindowAlias(): Unit = {
+ val result = table
+ .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w)
+ .select('c, 'b.count over 'x)
+ streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testOrderBy(): Unit = {
+ val result = table
+ .window(Over partitionBy 'c orderBy 'abc preceding 2.rows as 'w)
+ .select('c, 'b.count over 'w)
+ streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testPrecedingAndFollowingUsingIsLiteral(): Unit = {
+ val result = table
+ .window(Over partitionBy 'c orderBy 'rowtime preceding 2 following "xx" as 'w)
+ .select('c, 'b.count over 'w)
+ streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testPrecedingAndFollowingUsingSameType(): Unit = {
+ val result = table
+ .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_RANGE as 'w)
+ .select('c, 'b.count over 'w)
+ streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testPartitionByWithUnresolved(): Unit = {
+ val result = table
+ .window(Over partitionBy 'a + 'b orderBy 'rowtime preceding 2.rows as 'w)
+ .select('c, 'b.count over 'w)
+ streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testPartitionByWithNotKeyType(): Unit = {
+ val table2 = streamUtil.addTable[(Int, String, Either[Long, String])]("MyTable2", 'a, 'b, 'c)
+
+ val result = table2
+ .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows as 'w)
+ .select('c, 'b.count over 'w)
+ streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testPrecedingValue(): Unit = {
+ val result = table
+ .window(Over orderBy 'rowtime preceding -1.rows as 'w)
+ .select('c, 'b.count over 'w)
+ streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testFollowingValue(): Unit = {
+ val result = table
+ .window(Over orderBy 'rowtime preceding 1.rows following -2.rows as 'w)
+ .select('c, 'b.count over 'w)
+ streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUdAggWithInvalidArgs(): Unit = {
+ val weightedAvg = new WeightedAvgWithRetract
+
+ val result = table
+ .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
+ .select('c, weightedAvg('b, 'a) over 'w)
+ streamUtil.tableEnv.optimize(result.getRelNode, updatesAsRetraction = true)
+ }
+
+ @Test
+ def testAccessesWindowProperties(): Unit = {
+ thrown.expect(classOf[ValidationException])
+ thrown.expectMessage("Window start and end properties are not available for Over windows.")
+
+ table
+ .window(Over orderBy 'rowtime preceding 1.minutes as 'w)
+ .select('c, 'a.count over 'w, 'w.start, 'w.end)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSinksValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSinksValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSinksValidationTest.scala
new file mode 100644
index 0000000..72fb0b6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSinksValidationTest.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.{TableEnvironment, TableException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.runtime.datastream.table.{TestAppendSink, TestUpsertSink}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class TableSinksValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[TableException])
+ def testAppendSinkOnUpdatingTable(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'id, 'num, 'text)
+
+ t.groupBy('text)
+ .select('text, 'id.count, 'num.sum)
+ .writeToSink(new TestAppendSink)
+
+ // must fail because table is not append-only
+ env.execute()
+ }
+
+ @Test(expected = classOf[TableException])
+ def testUpsertSinkOnUpdatingTableWithoutFullKey(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ val t = StreamTestData.get3TupleDataStream(env)
+ .assignAscendingTimestamps(_._1.toLong)
+ .toTable(tEnv, 'id, 'num, 'text)
+
+ t.select('id, 'num, 'text.charLength() as 'len, ('id > 0) as 'cTrue)
+ .groupBy('len, 'cTrue)
+ .select('len, 'id.count, 'num.sum)
+ .writeToSink(new TestUpsertSink(Array("len", "cTrue"), false))
+
+ // must fail because table is updating table without full key
+ env.execute()
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSourceValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSourceValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSourceValidationTest.scala
new file mode 100644
index 0000000..bcfa146
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TableSourceValidationTest.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.table.{TestProctimeSource, TestRowtimeSource}
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class TableSourceValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[TableException])
+ def testRowtimeTableSourceWithEmptyName(): Unit = {
+ val util = streamTestUtil()
+ util.tableEnv.registerTableSource("rowTimeT", new TestRowtimeSource(" "))
+
+ val t = util.tableEnv.scan("rowTimeT")
+ .select('id)
+
+ util.tableEnv.optimize(t.getRelNode, false)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testProctimeTableSourceWithEmptyName(): Unit = {
+ val util = streamTestUtil()
+ util.tableEnv.registerTableSource("procTimeT", new TestProctimeSource(" "))
+
+ val t = util.tableEnv.scan("procTimeT")
+ .select('id)
+
+ util.tableEnv.optimize(t.getRelNode, false)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TimeAttributesValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TimeAttributesValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TimeAttributesValidationTest.scala
new file mode 100644
index 0000000..7a684d1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/TimeAttributesValidationTest.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import java.math.BigDecimal
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.TimeCharacteristic
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala.{Tumble, _}
+import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.runtime.datastream.table.TimeAttributesITCase._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+class TimeAttributesValidationTest extends TableTestBase {
+
+ val data = List(
+ (1L, 1, 1d, 1f, new BigDecimal("1"), "Hi"),
+ (2L, 2, 2d, 2f, new BigDecimal("2"), "Hallo"),
+ (3L, 2, 2d, 2f, new BigDecimal("2"), "Hello"),
+ (4L, 5, 5d, 5f, new BigDecimal("5"), "Hello"),
+ (7L, 3, 3d, 3f, new BigDecimal("3"), "Hello"),
+ (8L, 3, 3d, 3f, new BigDecimal("3"), "Hello world"),
+ (16L, 4, 4d, 4f, new BigDecimal("4"), "Hello world"))
+
+ @Test(expected = classOf[TableException])
+ def testInvalidTimeCharacteristic(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidUseOfRowtime(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+ .select('rowtime.rowtime)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidUseOfRowtime2(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ stream
+ .toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+ .window(Tumble over 2.millis on 'rowtime as 'w)
+ .groupBy('w)
+ .select('w.end.rowtime, 'int.count as 'int) // no rowtime on non-window reference
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/UnionValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/UnionValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/UnionValidationTest.scala
new file mode 100644
index 0000000..1767d9f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/validation/UnionValidationTest.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.runtime.datastream.StreamITCase
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.types.Row
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+import scala.collection.mutable
+
+class UnionValidationTest extends TableTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testUnionFieldsNameNotOverlap1(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
+
+ val unionDs = ds1.unionAll(ds2)
+
+ val results = unionDs.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ assertEquals(true, StreamITCase.testResults.isEmpty)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnionFieldsNameNotOverlap2(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ .select('a, 'b, 'c)
+
+ val unionDs = ds1.unionAll(ds2)
+
+ val results = unionDs.toAppendStream[Row]
+ results.addSink(new StreamITCase.StringSink[Row])
+ env.execute()
+
+ assertEquals(true, StreamITCase.testResults.isEmpty)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnionTablesFromDifferentEnv(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv1 = TableEnvironment.getTableEnvironment(env)
+ val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv1, 'a, 'b, 'c)
+ val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv2, 'a, 'b, 'c)
+
+ // Must fail. Tables are bound to different TableEnvironments.
+ ds1.unionAll(ds2)
+ }
+}