You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:32 UTC
[03/47] flink git commit: [FLINK-4704] [table] Refactor package
structure of flink-table.
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
new file mode 100644
index 0000000..f35ee76
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class UnionITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ def testUnion(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f)
+
+ val unionDs = ds1.unionAll(ds2).select('c)
+
+ val results = unionDs.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList(
+ "Hi", "Hello", "Hello world", "Hi", "Hello", "Hello world")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test
+ def testUnionWithFilter(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
+
+ val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
+
+ val results = unionDs.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = mutable.MutableList("Hi", "Hallo")
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnionFieldsNameNotOverlap1(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
+
+ val unionDs = ds1.unionAll(ds2)
+
+ val results = unionDs.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ assertEquals(true, StreamITCase.testResults.isEmpty)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnionFieldsNameNotOverlap2(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+
+ StreamITCase.testResults = mutable.MutableList()
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+ val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+ .select('a, 'b, 'c)
+
+ val unionDs = ds1.unionAll(ds2)
+
+ val results = unionDs.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ println(StreamITCase.testResults)
+ assertEquals(true, StreamITCase.testResults.isEmpty)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnionTablesFromDifferentEnvs(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv1 = TableEnvironment.getTableEnvironment(env)
+ val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+ val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv1, 'a, 'b, 'c)
+ val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv2, 'a, 'b, 'c)
+
+ // Must fail. Tables are bound to different TableEnvironments.
+ ds1.unionAll(ds2)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
new file mode 100644
index 0000000..b6a6660
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.junit.Test
+
+class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testSelectWithAggregation(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testDistinct(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct()
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testSort(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).orderBy('_1.desc)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testJoin(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ t1.join(t2)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnion(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ t1.union(t2)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testIntersect(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ t1.intersect(t2)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testIntersectAll(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ t1.intersectAll(t2)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testMinus(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ t1.minus(t2)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testMinusAll(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ t1.minusAll(t2)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testLimit(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+ t1.limit(0,5)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
new file mode 100644
index 0000000..168f9ec
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api._
+import org.apache.flink.table.expressions.utils._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils._
+import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaExecutionEnv}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment => ScalaExecutionEnv}
+import org.junit.Assert.{assertTrue, fail}
+import org.junit.Test
+import org.mockito.Mockito._
+
+class UserDefinedTableFunctionTest extends TableTestBase {
+
+ @Test
+ def testJavaScalaTableAPIEquality(): Unit = {
+ // mock
+ val ds = mock(classOf[DataStream[Row]])
+ val jDs = mock(classOf[JDataStream[Row]])
+ val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+ when(ds.javaStream).thenReturn(jDs)
+ when(jDs.getType).thenReturn(typeInfo)
+
+ // Scala environment
+ val env = mock(classOf[ScalaExecutionEnv])
+ val tableEnv = TableEnvironment.getTableEnvironment(env)
+ val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
+
+ // Java environment
+ val javaEnv = mock(classOf[JavaExecutionEnv])
+ val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
+ val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
+
+ // test cross join
+ val func1 = new TableFunc1
+ javaTableEnv.registerFunction("func1", func1)
+ var scalaTable = in1.join(func1('c) as 's).select('c, 's)
+ var javaTable = in2.join("func1(c).as(s)").select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test left outer join
+ scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
+ javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test overloading
+ scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
+ javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test custom result type
+ val func2 = new TableFunc2
+ javaTableEnv.registerFunction("func2", func2)
+ scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+ javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test hierarchy generic type
+ val hierarchy = new HierarchyTableFunction
+ javaTableEnv.registerFunction("hierarchy", hierarchy)
+ scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
+ .select('c, 'name, 'len, 'adult)
+ javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
+ .select("c, name, len, adult")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test pojo type
+ val pojo = new PojoTableFunc
+ javaTableEnv.registerFunction("pojo", pojo)
+ scalaTable = in1.join(pojo('c))
+ .select('c, 'name, 'age)
+ javaTable = in2.join("pojo(c)")
+ .select("c, name, age")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test with filter
+ scalaTable = in1.join(func2('c) as ('name, 'len))
+ .select('c, 'name, 'len).filter('len > 2)
+ javaTable = in2.join("func2(c) as (name, len)")
+ .select("c, name, len").filter("len > 2")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // test with scalar function
+ scalaTable = in1.join(func1('c.substring(2)) as 's)
+ .select('a, 'c, 's)
+ javaTable = in2.join("func1(substring(c, 2)) as (s)")
+ .select("a, c, s")
+ verifyTableEquals(scalaTable, javaTable)
+
+ // check scala object is forbidden
+ expectExceptionThrown(
+ tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
+ expectExceptionThrown(
+ javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
+ expectExceptionThrown(
+ in1.join(ObjectTableFunction('a, 1)), "Scala object")
+
+ }
+
+ @Test
+ def testInvalidTableFunction(): Unit = {
+ // mock
+ val util = streamTestUtil()
+ val t = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val tEnv = TableEnvironment.getTableEnvironment(mock(classOf[JavaExecutionEnv]))
+
+ //=================== check scala object is forbidden =====================
+ // Scala table environment register
+ expectExceptionThrown(util.addFunction("udtf", ObjectTableFunction), "Scala object")
+ // Java table environment register
+ expectExceptionThrown(tEnv.registerFunction("udtf", ObjectTableFunction), "Scala object")
+ // Scala Table API directly call
+ expectExceptionThrown(t.join(ObjectTableFunction('a, 1)), "Scala object")
+
+
+ //============ throw exception when table function is not registered =========
+ // Java Table API call
+ expectExceptionThrown(t.join("nonexist(a)"), "Undefined function: NONEXIST")
+ // SQL API call
+ expectExceptionThrown(
+ util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
+ "No match found for function signature nonexist(<NUMERIC>)")
+
+
+ //========= throw exception when the called function is a scalar function ====
+ util.addFunction("func0", Func0)
+ // Java Table API call
+ expectExceptionThrown(
+ t.join("func0(a)"),
+ "only accept expressions that define table functions",
+ classOf[TableException])
+ // SQL API call
+ // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
+ expectExceptionThrown(
+ util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
+ null,
+ classOf[AssertionError])
+
+ //========== throw exception when the parameters is not correct ===============
+ // Java Table API call
+ util.addFunction("func2", new TableFunc2)
+ expectExceptionThrown(
+ t.join("func2(c, c)"),
+ "Given parameters of function 'FUNC2' do not match any signature")
+ // SQL API call
+ expectExceptionThrown(
+ util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
+ "No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
+ }
+
+ @Test
+ def testCrossJoin(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = util.addFunction("func1", new TableFunc1)
+
+ val result1 = table.join(function('c) as 's).select('c, 's)
+
+ val expected1 = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", s"$function($$2)"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "s")
+ )
+
+ util.verifyTable(result1, expected1)
+
+ // test overloading
+
+ val result2 = table.join(function('c, "$") as 's).select('c, 's)
+
+ val expected2 = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", s"$function($$2, '$$')"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "s")
+ )
+
+ util.verifyTable(result2, expected2)
+ }
+
+ @Test
+ def testLeftOuterJoin(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = util.addFunction("func1", new TableFunc1)
+
+ val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", s"$function($$2)"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+ term("joinType", "LEFT")
+ ),
+ term("select", "c", "s")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testCustomType(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = util.addFunction("func2", new TableFunc2)
+
+ val result = table.join(function('c) as ('name, 'len)).select('c, 'name, 'len)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", s"$function($$2)"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+ "VARCHAR(2147483647) name, INTEGER len)"),
+ term("joinType", "INNER")
+ ),
+ term("select", "c", "name", "len")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testHierarchyType(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = util.addFunction("hierarchy", new HierarchyTableFunction)
+
+ val result = table.join(function('c) as ('name, 'adult, 'len))
+
+ val expected = unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", s"$function($$2)"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+ " VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)"),
+ term("joinType", "INNER")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testPojoType(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = util.addFunction("pojo", new PojoTableFunc)
+
+ val result = table.join(function('c))
+
+ val expected = unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", s"$function($$2)"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+ "INTEGER age, VARCHAR(2147483647) name)"),
+ term("joinType", "INNER")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testFilter(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = util.addFunction("func2", new TableFunc2)
+
+ val result = table
+ .join(function('c) as ('name, 'len))
+ .select('c, 'name, 'len)
+ .filter('len > 2)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", s"$function($$2)"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+ "VARCHAR(2147483647) name, INTEGER len)"),
+ term("joinType", "INNER"),
+ term("condition", ">($1, 2)")
+ ),
+ term("select", "c", "name", "len")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testScalarFunction(): Unit = {
+ val util = streamTestUtil()
+ val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+ val function = util.addFunction("func1", new TableFunc1)
+
+ val result = table.join(function('c.substring(2)) as 's)
+
+ val expected = unaryNode(
+ "DataStreamCorrelate",
+ streamTableNode(0),
+ term("invocation", s"$function(SUBSTRING($$2, 2, CHAR_LENGTH($$2)))"),
+ term("function", function),
+ term("rowType",
+ "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+ term("joinType", "INNER")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ private def expectExceptionThrown(
+ function: => Unit,
+ keywords: String,
+ clazz: Class[_ <: Throwable] = classOf[ValidationException])
+ : Unit = {
+ try {
+ function
+ fail(s"Expected a $clazz, but no exception is thrown.")
+ } catch {
+ case e if e.getClass == clazz =>
+ if (keywords != null) {
+ assertTrue(
+ s"The exception message '${e.getMessage}' doesn't contain keyword '$keywords'",
+ e.getMessage.contains(keywords))
+ }
+ case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
new file mode 100644
index 0000000..f826bba
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.utils
+
+import java.util.Collections
+
+import org.apache.flink.types.Row
+import org.junit.Assert._
+
+import scala.collection.mutable
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+
+import scala.collection.JavaConverters._
+
+object StreamITCase {
+
+ var testResults = mutable.MutableList.empty[String]
+
+ def clear = {
+ StreamITCase.testResults.clear()
+ }
+
+ def compareWithList(expected: java.util.List[String]): Unit = {
+ Collections.sort(expected)
+ assertEquals(expected.asScala, StreamITCase.testResults.sorted)
+ }
+
+ final class StringSink extends RichSinkFunction[Row]() {
+ def invoke(value: Row) {
+ testResults.synchronized {
+ testResults += value.toString
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala
new file mode 100644
index 0000000..6745039
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.utils
+
+import org.apache.flink.api.scala._
+import scala.collection.mutable
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+
+object StreamTestData {
+
+ def getSmall3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = {
+ val data = new mutable.MutableList[(Int, Long, String)]
+ data.+=((1, 1L, "Hi"))
+ data.+=((2, 2L, "Hello"))
+ data.+=((3, 2L, "Hello world"))
+ env.fromCollection(data)
+ }
+
+ def get3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = {
+ val data = new mutable.MutableList[(Int, Long, String)]
+ data.+=((1, 1L, "Hi"))
+ data.+=((2, 2L, "Hello"))
+ data.+=((3, 2L, "Hello world"))
+ data.+=((4, 3L, "Hello world, how are you?"))
+ data.+=((5, 3L, "I am fine."))
+ data.+=((6, 3L, "Luke Skywalker"))
+ data.+=((7, 4L, "Comment#1"))
+ data.+=((8, 4L, "Comment#2"))
+ data.+=((9, 4L, "Comment#3"))
+ data.+=((10, 4L, "Comment#4"))
+ data.+=((11, 5L, "Comment#5"))
+ data.+=((12, 5L, "Comment#6"))
+ data.+=((13, 5L, "Comment#7"))
+ data.+=((14, 5L, "Comment#8"))
+ data.+=((15, 5L, "Comment#9"))
+ data.+=((16, 6L, "Comment#10"))
+ data.+=((17, 6L, "Comment#11"))
+ data.+=((18, 6L, "Comment#12"))
+ data.+=((19, 6L, "Comment#13"))
+ data.+=((20, 6L, "Comment#14"))
+ data.+=((21, 6L, "Comment#15"))
+ env.fromCollection(data)
+ }
+
+ def get5TupleDataStream(env: StreamExecutionEnvironment):
+ DataStream[(Int, Long, Int, String, Long)] = {
+
+ val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
+ data.+=((1, 1L, 0, "Hallo", 1L))
+ data.+=((2, 2L, 1, "Hallo Welt", 2L))
+ data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
+ data.+=((3, 4L, 3, "Hallo Welt wie gehts?", 2L))
+ data.+=((3, 5L, 4, "ABC", 2L))
+ data.+=((3, 6L, 5, "BCD", 3L))
+ data.+=((4, 7L, 6, "CDE", 2L))
+ data.+=((4, 8L, 7, "DEF", 1L))
+ data.+=((4, 9L, 8, "EFG", 1L))
+ data.+=((4, 10L, 9, "FGH", 2L))
+ data.+=((5, 11L, 10, "GHI", 1L))
+ data.+=((5, 12L, 11, "HIJ", 3L))
+ data.+=((5, 13L, 12, "IJK", 3L))
+ data.+=((5, 14L, 13, "JKL", 2L))
+ data.+=((5, 15L, 14, "KLM", 2L))
+ env.fromCollection(data)
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
new file mode 100644
index 0000000..49cf572
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.sql.Date
+
+import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.junit.Test
+
+class ArrayTypeTest extends ExpressionTestBase {
+
+ @Test(expected = classOf[ValidationException])
+ def testObviousInvalidIndexTableApi(): Unit = {
+ testTableApi('f2.at(0), "FAIL", "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testEmptyArraySql(): Unit = {
+ testSqlApi("ARRAY[]", "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testEmptyArrayTableApi(): Unit = {
+ testTableApi("FAIL", "array()", "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testNullArraySql(): Unit = {
+ testSqlApi("ARRAY[NULL]", "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testDifferentTypesArraySql(): Unit = {
+ testSqlApi("ARRAY[1, TRUE]", "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testDifferentTypesArrayTableApi(): Unit = {
+ testTableApi("FAIL", "array(1, true)", "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testUnsupportedComparison(): Unit = {
+ testAllApis(
+ 'f2 <= 'f5.at(1),
+ "f2 <= f5.at(1)",
+ "f2 <= f5[1]",
+ "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testElementNonArray(): Unit = {
+ testTableApi(
+ 'f0.element(),
+ "FAIL",
+ "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testElementNonArraySql(): Unit = {
+ testSqlApi(
+ "ELEMENT(f0)",
+ "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testCardinalityOnNonArray(): Unit = {
+ testTableApi('f0.cardinality(), "FAIL", "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testCardinalityOnNonArraySql(): Unit = {
+ testSqlApi("CARDINALITY(f0)", "FAIL")
+ }
+
+ @Test
+ def testArrayLiterals(): Unit = {
+ // primitive literals
+ testAllApis(array(1, 2, 3), "array(1, 2, 3)", "ARRAY[1, 2, 3]", "[1, 2, 3]")
+
+ testAllApis(
+ array(true, true, true),
+ "array(true, true, true)",
+ "ARRAY[TRUE, TRUE, TRUE]",
+ "[true, true, true]")
+
+ // object literals
+ testTableApi(array(BigDecimal(1), BigDecimal(1)), "array(1p, 1p)", "[1, 1]")
+
+ testAllApis(
+ array(array(array(1), array(1))),
+ "array(array(array(1), array(1)))",
+ "ARRAY[ARRAY[ARRAY[1], ARRAY[1]]]",
+ "[[[1], [1]]]")
+
+ testAllApis(
+ array(1 + 1, 3 * 3),
+ "array(1 + 1, 3 * 3)",
+ "ARRAY[1 + 1, 3 * 3]",
+ "[2, 9]")
+
+ testAllApis(
+ array(Null(Types.INT), 1),
+ "array(Null(INT), 1)",
+ "ARRAY[NULLIF(1,1), 1]",
+ "[null, 1]")
+
+ testAllApis(
+ array(array(Null(Types.INT), 1)),
+ "array(array(Null(INT), 1))",
+ "ARRAY[ARRAY[NULLIF(1,1), 1]]",
+ "[[null, 1]]")
+
+ // implicit conversion
+ testTableApi(
+ Array(1, 2, 3),
+ "array(1, 2, 3)",
+ "[1, 2, 3]")
+
+ testTableApi(
+ Array[Integer](1, 2, 3),
+ "array(1, 2, 3)",
+ "[1, 2, 3]")
+
+ testAllApis(
+ Array(Date.valueOf("1985-04-11")),
+ "array('1985-04-11'.toDate)",
+ "ARRAY[DATE '1985-04-11']",
+ "[1985-04-11]")
+
+ testAllApis(
+ Array(BigDecimal(2.0002), BigDecimal(2.0003)),
+ "Array(2.0002p, 2.0003p)",
+ "ARRAY[CAST(2.0002 AS DECIMAL), CAST(2.0003 AS DECIMAL)]",
+ "[2.0002, 2.0003]")
+
+ testAllApis(
+ Array(Array(x = true)),
+ "Array(Array(true))",
+ "ARRAY[ARRAY[TRUE]]",
+ "[[true]]")
+
+ testAllApis(
+ Array(Array(1, 2, 3), Array(3, 2, 1)),
+ "Array(Array(1, 2, 3), Array(3, 2, 1))",
+ "ARRAY[ARRAY[1, 2, 3], ARRAY[3, 2, 1]]",
+ "[[1, 2, 3], [3, 2, 1]]")
+ }
+
+ @Test
+ def testArrayField(): Unit = {
+ testAllApis(
+ array('f0, 'f1),
+ "array(f0, f1)",
+ "ARRAY[f0, f1]",
+ "[null, 42]")
+
+ testAllApis(
+ array('f0, 'f1),
+ "array(f0, f1)",
+ "ARRAY[f0, f1]",
+ "[null, 42]")
+
+ testAllApis(
+ 'f2,
+ "f2",
+ "f2",
+ "[1, 2, 3]")
+
+ testAllApis(
+ 'f3,
+ "f3",
+ "f3",
+ "[1984-03-12, 1984-02-10]")
+
+ testAllApis(
+ 'f5,
+ "f5",
+ "f5",
+ "[[1, 2, 3], null]")
+
+ testAllApis(
+ 'f6,
+ "f6",
+ "f6",
+ "[1, null, null, 4]")
+
+ testAllApis(
+ 'f2,
+ "f2",
+ "f2",
+ "[1, 2, 3]")
+
+ testAllApis(
+ 'f2.at(1),
+ "f2.at(1)",
+ "f2[1]",
+ "1")
+
+ testAllApis(
+ 'f3.at(1),
+ "f3.at(1)",
+ "f3[1]",
+ "1984-03-12")
+
+ testAllApis(
+ 'f3.at(2),
+ "f3.at(2)",
+ "f3[2]",
+ "1984-02-10")
+
+ testAllApis(
+ 'f5.at(1).at(2),
+ "f5.at(1).at(2)",
+ "f5[1][2]",
+ "2")
+
+ testAllApis(
+ 'f5.at(2).at(2),
+ "f5.at(2).at(2)",
+ "f5[2][2]",
+ "null")
+
+ testAllApis(
+ 'f4.at(2).at(2),
+ "f4.at(2).at(2)",
+ "f4[2][2]",
+ "null")
+ }
+
+ @Test
+ def testArrayOperations(): Unit = {
+ // cardinality
+ testAllApis(
+ 'f2.cardinality(),
+ "f2.cardinality()",
+ "CARDINALITY(f2)",
+ "3")
+
+ testAllApis(
+ 'f4.cardinality(),
+ "f4.cardinality()",
+ "CARDINALITY(f4)",
+ "null")
+
+ // element
+ testAllApis(
+ 'f9.element(),
+ "f9.element()",
+ "ELEMENT(f9)",
+ "1")
+
+ testAllApis(
+ 'f8.element(),
+ "f8.element()",
+ "ELEMENT(f8)",
+ "4.0")
+
+ testAllApis(
+ 'f10.element(),
+ "f10.element()",
+ "ELEMENT(f10)",
+ "null")
+
+ testAllApis(
+ 'f4.element(),
+ "f4.element()",
+ "ELEMENT(f4)",
+ "null")
+
+ // comparison
+ testAllApis(
+ 'f2 === 'f5.at(1),
+ "f2 === f5.at(1)",
+ "f2 = f5[1]",
+ "true")
+
+ testAllApis(
+ 'f6 === array(1, 2, 3),
+ "f6 === array(1, 2, 3)",
+ "f6 = ARRAY[1, 2, 3]",
+ "false")
+
+ testAllApis(
+ 'f2 !== 'f5.at(1),
+ "f2 !== f5.at(1)",
+ "f2 <> f5[1]",
+ "false")
+
+ testAllApis(
+ 'f2 === 'f7,
+ "f2 === f7",
+ "f2 = f7",
+ "false")
+
+ testAllApis(
+ 'f2 !== 'f7,
+ "f2 !== f7",
+ "f2 <> f7",
+ "true")
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ case class MyCaseClass(string: String, int: Int)
+
+ override def testData: Any = {
+ val testData = new Row(11)
+ testData.setField(0, null)
+ testData.setField(1, 42)
+ testData.setField(2, Array(1, 2, 3))
+ testData.setField(3, Array(Date.valueOf("1984-03-12"), Date.valueOf("1984-02-10")))
+ testData.setField(4, null)
+ testData.setField(5, Array(Array(1, 2, 3), null))
+ testData.setField(6, Array[Integer](1, null, null, 4))
+ testData.setField(7, Array(1, 2, 3, 4))
+ testData.setField(8, Array(4.0))
+ testData.setField(9, Array[Integer](1))
+ testData.setField(10, Array[Integer]())
+ testData
+ }
+
+ override def typeInfo: TypeInformation[Any] = {
+ new RowTypeInfo(
+ Types.INT,
+ Types.INT,
+ PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
+ ObjectArrayTypeInfo.getInfoFor(Types.DATE),
+ ObjectArrayTypeInfo.getInfoFor(ObjectArrayTypeInfo.getInfoFor(Types.INT)),
+ ObjectArrayTypeInfo.getInfoFor(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO),
+ ObjectArrayTypeInfo.getInfoFor(Types.INT),
+ PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
+ PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
+ ObjectArrayTypeInfo.getInfoFor(Types.INT),
+ ObjectArrayTypeInfo.getInfoFor(Types.INT)
+ ).asInstanceOf[TypeInformation[Any]]
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
new file mode 100644
index 0000000..2025880
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.CompositeAccessTest.{MyCaseClass, MyCaseClass2, MyPojo}
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.junit.Test
+
+
+class CompositeAccessTest extends ExpressionTestBase {
+
+ @Test
+ def testGetField(): Unit = {
+
+ // single field by string key
+ testAllApis(
+ 'f0.get("intField"),
+ "f0.get('intField')",
+ "testTable.f0.intField",
+ "42")
+
+ testSqlApi("testTable.f0.stringField", "Bob")
+
+ testSqlApi("testTable.f0.booleanField", "true")
+
+ // single field by int key
+ testTableApi(
+ 'f0.get(0),
+ "f0.get(0)",
+ "42")
+
+ // nested single field
+ testAllApis(
+ 'f1.get("objectField").get("intField"),
+ "f1.get('objectField').get('intField')",
+ "testTable.f1.objectField.intField",
+ "25")
+
+ testSqlApi("testTable.f1.objectField.stringField", "Timo")
+
+ testSqlApi("testTable.f1.objectField.booleanField", "false")
+
+ testAllApis(
+ 'f2.get(0),
+ "f2.get(0)",
+ "testTable.f2._1",
+ "a")
+
+ testSqlApi("testTable.f3.f1", "b")
+
+ testSqlApi("testTable.f4.myString", "Hello")
+
+ testSqlApi("testTable.f5", "13")
+
+ testAllApis(
+ 'f7.get("_1"),
+ "get(f7, '_1')",
+ "testTable.f7._1",
+ "true")
+
+ // composite field return type
+ testSqlApi("testTable.f6", "MyCaseClass2(null)")
+
+ testAllApis(
+ 'f1.get("objectField"),
+ "f1.get('objectField')",
+ "testTable.f1.objectField",
+ "MyCaseClass(25,Timo,false)")
+
+ testAllApis(
+ 'f0,
+ "f0",
+ "testTable.f0",
+ "MyCaseClass(42,Bob,true)")
+
+ // flattening (test base only returns first column)
+ testAllApis(
+ 'f1.get("objectField").flatten(),
+ "f1.get('objectField').flatten()",
+ "testTable.f1.objectField.*",
+ "25")
+
+ testAllApis(
+ 'f0.flatten(),
+ "flatten(f0)",
+ "testTable.f0.*",
+ "42")
+
+ testTableApi(12.flatten(), "12.flatten()", "12")
+
+ testTableApi('f5.flatten(), "f5.flatten()", "13")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWrongSqlField(): Unit = {
+ testSqlApi("testTable.f5.test", "13")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWrongIntKeyField(): Unit = {
+ testTableApi('f0.get(555), "'fail'", "fail")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWrongIntKeyField2(): Unit = {
+ testTableApi("fail", "f0.get(555)", "fail")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWrongStringKeyField(): Unit = {
+ testTableApi('f0.get("fghj"), "'fail'", "fail")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testWrongStringKeyField2(): Unit = {
+ testTableApi("fail", "f0.get('fghj')", "fail")
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ def testData = {
+ val testData = new Row(8)
+ testData.setField(0, MyCaseClass(42, "Bob", booleanField = true))
+ testData.setField(1, MyCaseClass2(MyCaseClass(25, "Timo", booleanField = false)))
+ testData.setField(2, ("a", "b"))
+ testData.setField(3, new org.apache.flink.api.java.tuple.Tuple2[String, String]("a", "b"))
+ testData.setField(4, new MyPojo())
+ testData.setField(5, 13)
+ testData.setField(6, MyCaseClass2(null))
+ testData.setField(7, Tuple1(true))
+ testData
+ }
+
+ def typeInfo = {
+ new RowTypeInfo(
+ createTypeInformation[MyCaseClass],
+ createTypeInformation[MyCaseClass2],
+ createTypeInformation[(String, String)],
+ new TupleTypeInfo(Types.STRING, Types.STRING),
+ TypeExtractor.createTypeInfo(classOf[MyPojo]),
+ Types.INT,
+ createTypeInformation[MyCaseClass2],
+ createTypeInformation[Tuple1[Boolean]]
+ ).asInstanceOf[TypeInformation[Any]]
+ }
+
+}
+
+object CompositeAccessTest {
+ case class MyCaseClass(intField: Int, stringField: String, booleanField: Boolean)
+
+ case class MyCaseClass2(objectField: MyCaseClass)
+
+ class MyPojo {
+ private var myInt: Int = 0
+ private var myString: String = "Hello"
+
+ def getMyInt = myInt
+
+ def setMyInt(value: Int) = {
+ myInt = value
+ }
+
+ def getMyString = myString
+
+ def setMyString(value: String) = {
+ myString = myString
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala
new file mode 100644
index 0000000..bdc239d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.junit.Test
+
+class DecimalTypeTest extends ExpressionTestBase {
+
+ @Test
+ def testDecimalLiterals(): Unit = {
+ // implicit double
+ testAllApis(
+ 11.2,
+ "11.2",
+ "11.2",
+ "11.2")
+
+ // implicit double
+ testAllApis(
+ 0.7623533651719233,
+ "0.7623533651719233",
+ "0.7623533651719233",
+ "0.7623533651719233")
+
+ // explicit decimal (with precision of 19)
+ testAllApis(
+ BigDecimal("1234567891234567891"),
+ "1234567891234567891p",
+ "1234567891234567891",
+ "1234567891234567891")
+
+ // explicit decimal (high precision, not SQL compliant)
+ testTableApi(
+ BigDecimal("123456789123456789123456789"),
+ "123456789123456789123456789p",
+ "123456789123456789123456789")
+
+ // explicit decimal (high precision, not SQL compliant)
+ testTableApi(
+ BigDecimal("12.3456789123456789123456789"),
+ "12.3456789123456789123456789p",
+ "12.3456789123456789123456789")
+ }
+
+ @Test
+ def testDecimalBorders(): Unit = {
+ testAllApis(
+ Double.MaxValue,
+ Double.MaxValue.toString,
+ Double.MaxValue.toString,
+ Double.MaxValue.toString)
+
+ testAllApis(
+ Double.MinValue,
+ Double.MinValue.toString,
+ Double.MinValue.toString,
+ Double.MinValue.toString)
+
+ testAllApis(
+ Double.MinValue.cast(Types.FLOAT),
+ s"${Double.MinValue}.cast(FLOAT)",
+ s"CAST(${Double.MinValue} AS FLOAT)",
+ Float.NegativeInfinity.toString)
+
+ testAllApis(
+ Byte.MinValue.cast(Types.BYTE),
+ s"(${Byte.MinValue}).cast(BYTE)",
+ s"CAST(${Byte.MinValue} AS TINYINT)",
+ Byte.MinValue.toString)
+
+ testAllApis(
+ Byte.MinValue.cast(Types.BYTE) - 1.cast(Types.BYTE),
+ s"(${Byte.MinValue}).cast(BYTE) - (1).cast(BYTE)",
+ s"CAST(${Byte.MinValue} AS TINYINT) - CAST(1 AS TINYINT)",
+ Byte.MaxValue.toString)
+
+ testAllApis(
+ Short.MinValue.cast(Types.SHORT),
+ s"(${Short.MinValue}).cast(SHORT)",
+ s"CAST(${Short.MinValue} AS SMALLINT)",
+ Short.MinValue.toString)
+
+ testAllApis(
+ Int.MinValue.cast(Types.INT) - 1,
+ s"(${Int.MinValue}).cast(INT) - 1",
+ s"CAST(${Int.MinValue} AS INT) - 1",
+ Int.MaxValue.toString)
+
+ testAllApis(
+ Long.MinValue.cast(Types.LONG),
+ s"(${Long.MinValue}L).cast(LONG)",
+ s"CAST(${Long.MinValue} AS BIGINT)",
+ Long.MinValue.toString)
+ }
+
+ @Test
+ def testDecimalCasting(): Unit = {
+ // from String
+ testTableApi(
+ "123456789123456789123456789".cast(Types.DECIMAL),
+ "'123456789123456789123456789'.cast(DECIMAL)",
+ "123456789123456789123456789")
+
+ // from double
+ testAllApis(
+ 'f3.cast(Types.DECIMAL),
+ "f3.cast(DECIMAL)",
+ "CAST(f3 AS DECIMAL)",
+ "4.2")
+
+ // to double
+ testAllApis(
+ 'f0.cast(Types.DOUBLE),
+ "f0.cast(DOUBLE)",
+ "CAST(f0 AS DOUBLE)",
+ "1.2345678912345679E8")
+
+ // to int
+ testAllApis(
+ 'f4.cast(Types.INT),
+ "f4.cast(INT)",
+ "CAST(f4 AS INT)",
+ "123456789")
+
+ // to long
+ testAllApis(
+ 'f4.cast(Types.LONG),
+ "f4.cast(LONG)",
+ "CAST(f4 AS BIGINT)",
+ "123456789")
+
+ // to boolean (not SQL compliant)
+ testTableApi(
+ 'f1.cast(Types.BOOLEAN),
+ "f1.cast(BOOL)",
+ "true")
+
+ testTableApi(
+ 'f5.cast(Types.BOOLEAN),
+ "f5.cast(BOOL)",
+ "false")
+
+ testTableApi(
+ BigDecimal("123456789.123456789123456789").cast(Types.DOUBLE),
+ "(123456789.123456789123456789p).cast(DOUBLE)",
+ "1.2345678912345679E8")
+ }
+
+ @Test
+ def testDecimalArithmetic(): Unit = {
+ // implicit cast to decimal
+ testAllApis(
+ 'f1 + 12,
+ "f1 + 12",
+ "f1 + 12",
+ "123456789123456789123456801")
+
+ // implicit cast to decimal
+ testAllApis(
+ Literal(12) + 'f1,
+ "12 + f1",
+ "12 + f1",
+ "123456789123456789123456801")
+
+ // implicit cast to decimal
+ testAllApis(
+ 'f1 + 12.3,
+ "f1 + 12.3",
+ "f1 + 12.3",
+ "123456789123456789123456801.3")
+
+ // implicit cast to decimal
+ testAllApis(
+ Literal(12.3) + 'f1,
+ "12.3 + f1",
+ "12.3 + f1",
+ "123456789123456789123456801.3")
+
+ testAllApis(
+ 'f1 + 'f1,
+ "f1 + f1",
+ "f1 + f1",
+ "246913578246913578246913578")
+
+ testAllApis(
+ 'f1 - 'f1,
+ "f1 - f1",
+ "f1 - f1",
+ "0")
+
+ testAllApis(
+ 'f1 * 'f1,
+ "f1 * f1",
+ "f1 * f1",
+ "15241578780673678546105778281054720515622620750190521")
+
+ testAllApis(
+ 'f1 / 'f1,
+ "f1 / f1",
+ "f1 / f1",
+ "1")
+
+ testAllApis(
+ 'f1 % 'f1,
+ "f1 % f1",
+ "MOD(f1, f1)",
+ "0")
+
+ testAllApis(
+ -'f0,
+ "-f0",
+ "-f0",
+ "-123456789.123456789123456789")
+ }
+
+ @Test
+ def testDecimalComparison(): Unit = {
+ testAllApis(
+ 'f1 < 12,
+ "f1 < 12",
+ "f1 < 12",
+ "false")
+
+ testAllApis(
+ 'f1 > 12,
+ "f1 > 12",
+ "f1 > 12",
+ "true")
+
+ testAllApis(
+ 'f1 === 12,
+ "f1 === 12",
+ "f1 = 12",
+ "false")
+
+ testAllApis(
+ 'f5 === 0,
+ "f5 === 0",
+ "f5 = 0",
+ "true")
+
+ testAllApis(
+ 'f1 === BigDecimal("123456789123456789123456789"),
+ "f1 === 123456789123456789123456789p",
+ "f1 = CAST('123456789123456789123456789' AS DECIMAL)",
+ "true")
+
+ testAllApis(
+ 'f1 !== BigDecimal("123456789123456789123456789"),
+ "f1 !== 123456789123456789123456789p",
+ "f1 <> CAST('123456789123456789123456789' AS DECIMAL)",
+ "false")
+
+ testAllApis(
+ 'f4 < 'f0,
+ "f4 < f0",
+ "f4 < f0",
+ "true")
+
+ // TODO add all tests if FLINK-4070 is fixed
+ testSqlApi(
+ "12 < f1",
+ "true")
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ def testData = {
+ val testData = new Row(6)
+ testData.setField(0, BigDecimal("123456789.123456789123456789").bigDecimal)
+ testData.setField(1, BigDecimal("123456789123456789123456789").bigDecimal)
+ testData.setField(2, 42)
+ testData.setField(3, 4.2)
+ testData.setField(4, BigDecimal("123456789").bigDecimal)
+ testData.setField(5, BigDecimal("0.000").bigDecimal)
+ testData
+ }
+
+ def typeInfo = {
+ new RowTypeInfo(
+ Types.DECIMAL,
+ Types.DECIMAL,
+ Types.INT,
+ Types.DOUBLE,
+ Types.DECIMAL,
+ Types.DECIMAL).asInstanceOf[TypeInformation[Any]]
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
new file mode 100644
index 0000000..bcc53af
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.junit.{Ignore, Test}
+
+/**
+ * Tests that can only be checked manually as they are non-deterministic.
+ */
+class NonDeterministicTests extends ExpressionTestBase {
+
+ @Ignore
+ @Test
+ def testCurrentDate(): Unit = {
+ testAllApis(
+ currentDate(),
+ "currentDate()",
+ "CURRENT_DATE",
+ "PLEASE CHECK MANUALLY")
+ }
+
+ @Ignore
+ @Test
+ def testCurrentTime(): Unit = {
+ testAllApis(
+ currentTime(),
+ "currentTime()",
+ "CURRENT_TIME",
+ "PLEASE CHECK MANUALLY")
+ }
+
+ @Ignore
+ @Test
+ def testCurrentTimestamp(): Unit = {
+ testAllApis(
+ currentTimestamp(),
+ "currentTimestamp()",
+ "CURRENT_TIMESTAMP",
+ "PLEASE CHECK MANUALLY")
+ }
+
+ @Ignore
+ @Test
+ def testLocalTimestamp(): Unit = {
+ testAllApis(
+ localTimestamp(),
+ "localTimestamp()",
+ "LOCALTIMESTAMP",
+ "PLEASE CHECK MANUALLY")
+ }
+
+ @Ignore
+ @Test
+ def testLocalTime(): Unit = {
+ testAllApis(
+ localTime(),
+ "localTime()",
+ "LOCALTIME",
+ "PLEASE CHECK MANUALLY")
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ override def testData: Any = new Row(0)
+
+ override def typeInfo: TypeInformation[Any] =
+ new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
new file mode 100644
index 0000000..596907b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -0,0 +1,1166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.junit.Test
+
+class ScalarFunctionsTest extends ExpressionTestBase {
+
+ // ----------------------------------------------------------------------------------------------
+ // String functions
+ // ----------------------------------------------------------------------------------------------
+
+ @Test
+ def testOverlay(): Unit = {
+ testAllApis(
+ "xxxxxtest".overlay("xxxx", 6),
+ "'xxxxxtest'.overlay('xxxx', 6)",
+ "OVERLAY('xxxxxtest' PLACING 'xxxx' FROM 6)",
+ "xxxxxxxxx")
+
+ testAllApis(
+ "xxxxxtest".overlay("xxxx", 6, 2),
+ "'xxxxxtest'.overlay('xxxx', 6, 2)",
+ "OVERLAY('xxxxxtest' PLACING 'xxxx' FROM 6 FOR 2)",
+ "xxxxxxxxxst")
+ }
+
+ @Test
+ def testPosition(): Unit = {
+ testAllApis(
+ "test".position("xxxtest"),
+ "'test'.position('xxxtest')",
+ "POSITION('test' IN 'xxxtest')",
+ "4")
+
+ testAllApis(
+ "testx".position("xxxtest"),
+ "'testx'.position('xxxtest')",
+ "POSITION('testx' IN 'xxxtest')",
+ "0")
+ }
+
+ @Test
+ def testSubstring(): Unit = {
+ testAllApis(
+ 'f0.substring(2),
+ "f0.substring(2)",
+ "SUBSTRING(f0, 2)",
+ "his is a test String.")
+
+ testAllApis(
+ 'f0.substring(2, 5),
+ "f0.substring(2, 5)",
+ "SUBSTRING(f0, 2, 5)",
+ "his i")
+
+ testAllApis(
+ 'f0.substring(1, 'f7),
+ "f0.substring(1, f7)",
+ "SUBSTRING(f0, 1, f7)",
+ "Thi")
+
+ testAllApis(
+ 'f0.substring(1.cast(Types.BYTE), 'f7),
+ "f0.substring(1.cast(BYTE), f7)",
+ "SUBSTRING(f0, CAST(1 AS TINYINT), f7)",
+ "Thi")
+
+ testSqlApi(
+ "SUBSTRING(f0 FROM 2 FOR 1)",
+ "h")
+
+ testSqlApi(
+ "SUBSTRING(f0 FROM 2)",
+ "his is a test String.")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidSubstring1(): Unit = {
+ // Must fail. Parameter of substring must be an Integer not a Double.
+ testTableApi("test".substring(2.0.toExpr), "FAIL", "FAIL")
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidSubstring2(): Unit = {
+ // Must fail. Parameter of substring must be an Integer not a String.
+ testTableApi("test".substring("test".toExpr), "FAIL", "FAIL")
+ }
+
+ @Test
+ def testTrim(): Unit = {
+ testAllApis(
+ 'f8.trim(),
+ "f8.trim()",
+ "TRIM(f8)",
+ "This is a test String.")
+
+ testAllApis(
+ 'f8.trim(removeLeading = true, removeTrailing = true, " "),
+ "trim(f8)",
+ "TRIM(f8)",
+ "This is a test String.")
+
+ testAllApis(
+ 'f8.trim(removeLeading = false, removeTrailing = true, " "),
+ "f8.trim(TRAILING, ' ')",
+ "TRIM(TRAILING FROM f8)",
+ " This is a test String.")
+
+ testAllApis(
+ 'f0.trim(removeLeading = true, removeTrailing = true, "."),
+ "trim(BOTH, '.', f0)",
+ "TRIM(BOTH '.' FROM f0)",
+ "This is a test String")
+ }
+
+ @Test
+ def testCharLength(): Unit = {
+ testAllApis(
+ 'f0.charLength(),
+ "f0.charLength()",
+ "CHAR_LENGTH(f0)",
+ "22")
+
+ testAllApis(
+ 'f0.charLength(),
+ "charLength(f0)",
+ "CHARACTER_LENGTH(f0)",
+ "22")
+ }
+
+ @Test
+ def testUpperCase(): Unit = {
+ testAllApis(
+ 'f0.upperCase(),
+ "f0.upperCase()",
+ "UPPER(f0)",
+ "THIS IS A TEST STRING.")
+ }
+
+ @Test
+ def testLowerCase(): Unit = {
+ testAllApis(
+ 'f0.lowerCase(),
+ "f0.lowerCase()",
+ "LOWER(f0)",
+ "this is a test string.")
+ }
+
+ @Test
+ def testInitCap(): Unit = {
+ testAllApis(
+ 'f0.initCap(),
+ "f0.initCap()",
+ "INITCAP(f0)",
+ "This Is A Test String.")
+ }
+
+ @Test
+ def testConcat(): Unit = {
+ testAllApis(
+ 'f0 + 'f0,
+ "f0 + f0",
+ "f0||f0",
+ "This is a test String.This is a test String.")
+ }
+
+ @Test
+ def testLike(): Unit = {
+ testAllApis(
+ 'f0.like("Th_s%"),
+ "f0.like('Th_s%')",
+ "f0 LIKE 'Th_s%'",
+ "true")
+
+ testAllApis(
+ 'f0.like("%is a%"),
+ "f0.like('%is a%')",
+ "f0 LIKE '%is a%'",
+ "true")
+ }
+
+ @Test
+ def testNotLike(): Unit = {
+ testAllApis(
+ !'f0.like("Th_s%"),
+ "!f0.like('Th_s%')",
+ "f0 NOT LIKE 'Th_s%'",
+ "false")
+
+ testAllApis(
+ !'f0.like("%is a%"),
+ "!f0.like('%is a%')",
+ "f0 NOT LIKE '%is a%'",
+ "false")
+ }
+
+ @Test
+ def testLikeWithEscape(): Unit = {
+ testSqlApi(
+ "f23 LIKE '&%Th_s%' ESCAPE '&'",
+ "true")
+
+ testSqlApi(
+ "f23 LIKE '&%%is a%' ESCAPE '&'",
+ "true")
+
+ testSqlApi(
+ "f0 LIKE 'Th_s%' ESCAPE '&'",
+ "true")
+
+ testSqlApi(
+ "f0 LIKE '%is a%' ESCAPE '&'",
+ "true")
+ }
+
+ @Test
+ def testNotLikeWithEscape(): Unit = {
+ testSqlApi(
+ "f23 NOT LIKE '&%Th_s%' ESCAPE '&'",
+ "false")
+
+ testSqlApi(
+ "f23 NOT LIKE '&%%is a%' ESCAPE '&'",
+ "false")
+
+ testSqlApi(
+ "f0 NOT LIKE 'Th_s%' ESCAPE '&'",
+ "false")
+
+ testSqlApi(
+ "f0 NOT LIKE '%is a%' ESCAPE '&'",
+ "false")
+ }
+
+ @Test
+ def testSimilar(): Unit = {
+ testAllApis(
+ 'f0.similar("_*"),
+ "f0.similar('_*')",
+ "f0 SIMILAR TO '_*'",
+ "true")
+
+ testAllApis(
+ 'f0.similar("This (is)? a (test)+ Strin_*"),
+ "f0.similar('This (is)? a (test)+ Strin_*')",
+ "f0 SIMILAR TO 'This (is)? a (test)+ Strin_*'",
+ "true")
+ }
+
+ @Test
+ def testNotSimilar(): Unit = {
+ testAllApis(
+ !'f0.similar("_*"),
+ "!f0.similar('_*')",
+ "f0 NOT SIMILAR TO '_*'",
+ "false")
+
+ testAllApis(
+ !'f0.similar("This (is)? a (test)+ Strin_*"),
+ "!f0.similar('This (is)? a (test)+ Strin_*')",
+ "f0 NOT SIMILAR TO 'This (is)? a (test)+ Strin_*'",
+ "false")
+ }
+
+ @Test
+ def testSimilarWithEscape(): Unit = {
+ testSqlApi(
+ "f24 SIMILAR TO '&*&__*' ESCAPE '&'",
+ "true")
+
+ testSqlApi(
+ "f0 SIMILAR TO '_*' ESCAPE '&'",
+ "true")
+
+ testSqlApi(
+ "f24 SIMILAR TO '&*&_This (is)? a (test)+ Strin_*' ESCAPE '&'",
+ "true")
+
+ testSqlApi(
+ "f0 SIMILAR TO 'This (is)? a (test)+ Strin_*' ESCAPE '&'",
+ "true")
+ }
+
+ @Test
+ def testNotSimilarWithEscape(): Unit = {
+ testSqlApi(
+ "f24 NOT SIMILAR TO '&*&__*' ESCAPE '&'",
+ "false")
+
+ testSqlApi(
+ "f0 NOT SIMILAR TO '_*' ESCAPE '&'",
+ "false")
+
+ testSqlApi(
+ "f24 NOT SIMILAR TO '&*&_This (is)? a (test)+ Strin_*' ESCAPE '&'",
+ "false")
+
+ testSqlApi(
+ "f0 NOT SIMILAR TO 'This (is)? a (test)+ Strin_*' ESCAPE '&'",
+ "false")
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // Math functions
+ // ----------------------------------------------------------------------------------------------
+
+ @Test
+ def testMod(): Unit = {
+ testAllApis(
+ 'f4.mod('f7),
+ "f4.mod(f7)",
+ "MOD(f4, f7)",
+ "2")
+
+ testAllApis(
+ 'f4.mod(3),
+ "mod(f4, 3)",
+ "MOD(f4, 3)",
+ "2")
+
+ testAllApis(
+ 'f4 % 3,
+ "mod(44, 3)",
+ "MOD(44, 3)",
+ "2")
+
+ }
+
+ @Test
+ def testExp(): Unit = {
+ testAllApis(
+ 'f2.exp(),
+ "f2.exp()",
+ "EXP(f2)",
+ math.exp(42.toByte).toString)
+
+ testAllApis(
+ 'f3.exp(),
+ "f3.exp()",
+ "EXP(f3)",
+ math.exp(43.toShort).toString)
+
+ testAllApis(
+ 'f4.exp(),
+ "f4.exp()",
+ "EXP(f4)",
+ math.exp(44.toLong).toString)
+
+ testAllApis(
+ 'f5.exp(),
+ "f5.exp()",
+ "EXP(f5)",
+ math.exp(4.5.toFloat).toString)
+
+ testAllApis(
+ 'f6.exp(),
+ "f6.exp()",
+ "EXP(f6)",
+ math.exp(4.6).toString)
+
+ testAllApis(
+ 'f7.exp(),
+ "exp(3)",
+ "EXP(3)",
+ math.exp(3).toString)
+
+ testAllApis(
+ 'f7.exp(),
+ "exp(3)",
+ "EXP(3)",
+ math.exp(3).toString)
+ }
+
+ @Test
+ def testLog10(): Unit = {
+ testAllApis(
+ 'f2.log10(),
+ "f2.log10()",
+ "LOG10(f2)",
+ math.log10(42.toByte).toString)
+
+ testAllApis(
+ 'f3.log10(),
+ "f3.log10()",
+ "LOG10(f3)",
+ math.log10(43.toShort).toString)
+
+ testAllApis(
+ 'f4.log10(),
+ "f4.log10()",
+ "LOG10(f4)",
+ math.log10(44.toLong).toString)
+
+ testAllApis(
+ 'f5.log10(),
+ "f5.log10()",
+ "LOG10(f5)",
+ math.log10(4.5.toFloat).toString)
+
+ testAllApis(
+ 'f6.log10(),
+ "f6.log10()",
+ "LOG10(f6)",
+ math.log10(4.6).toString)
+ }
+
+ @Test
+ def testPower(): Unit = {
+ // f7: int , f4: long, f6: double
+ testAllApis(
+ 'f2.power('f7),
+ "f2.power(f7)",
+ "POWER(f2, f7)",
+ math.pow(42.toByte, 3).toString)
+
+ testAllApis(
+ 'f3.power('f6),
+ "f3.power(f6)",
+ "POWER(f3, f6)",
+ math.pow(43.toShort, 4.6D).toString)
+
+ testAllApis(
+ 'f4.power('f5),
+ "f4.power(f5)",
+ "POWER(f4, f5)",
+ math.pow(44.toLong, 4.5.toFloat).toString)
+
+ testAllApis(
+ 'f4.power('f5),
+ "f4.power(f5)",
+ "POWER(f4, f5)",
+ math.pow(44.toLong, 4.5.toFloat).toString)
+
+ // f5: float
+ testAllApis('f5.power('f5),
+ "f5.power(f5)",
+ "power(f5, f5)",
+ math.pow(4.5F, 4.5F).toString)
+
+ testAllApis('f5.power('f6),
+ "f5.power(f6)",
+ "power(f5, f6)",
+ math.pow(4.5F, 4.6D).toString)
+
+ testAllApis('f5.power('f7),
+ "f5.power(f7)",
+ "power(f5, f7)",
+ math.pow(4.5F, 3).toString)
+
+ testAllApis('f5.power('f4),
+ "f5.power(f4)",
+ "power(f5, f4)",
+ math.pow(4.5F, 44L).toString)
+
+ // f22: bigDecimal
+ // TODO delete casting in SQL when CALCITE-1467 is fixed
+ testAllApis(
+ 'f22.cast(Types.DOUBLE).power('f5),
+ "f22.cast(DOUBLE).power(f5)",
+ "power(CAST(f22 AS DOUBLE), f5)",
+ math.pow(2, 4.5F).toString)
+
+ testAllApis(
+ 'f22.cast(Types.DOUBLE).power('f6),
+ "f22.cast(DOUBLE).power(f6)",
+ "power(CAST(f22 AS DOUBLE), f6)",
+ math.pow(2, 4.6D).toString)
+
+ testAllApis(
+ 'f22.cast(Types.DOUBLE).power('f7),
+ "f22.cast(DOUBLE).power(f7)",
+ "power(CAST(f22 AS DOUBLE), f7)",
+ math.pow(2, 3).toString)
+
+ testAllApis(
+ 'f22.cast(Types.DOUBLE).power('f4),
+ "f22.cast(DOUBLE).power(f4)",
+ "power(CAST(f22 AS DOUBLE), f4)",
+ math.pow(2, 44L).toString)
+
+ testAllApis(
+ 'f6.power('f22.cast(Types.DOUBLE)),
+ "f6.power(f22.cast(DOUBLE))",
+ "power(f6, f22)",
+ math.pow(4.6D, 2).toString)
+ }
+
+ @Test
+ def testSqrt(): Unit = {
+ testAllApis(
+ 'f6.sqrt(),
+ "f6.sqrt",
+ "SQRT(f6)",
+ math.sqrt(4.6D).toString)
+
+ testAllApis(
+ 'f7.sqrt(),
+ "f7.sqrt",
+ "SQRT(f7)",
+ math.sqrt(3).toString)
+
+ testAllApis(
+ 'f4.sqrt(),
+ "f4.sqrt",
+ "SQRT(f4)",
+ math.sqrt(44L).toString)
+
+ testAllApis(
+ 'f22.cast(Types.DOUBLE).sqrt(),
+ "f22.cast(DOUBLE).sqrt",
+ "SQRT(CAST(f22 AS DOUBLE))",
+ math.sqrt(2.0).toString)
+
+ testAllApis(
+ 'f5.sqrt(),
+ "f5.sqrt",
+ "SQRT(f5)",
+ math.pow(4.5F, 0.5).toString)
+
+ testAllApis(
+ 25.sqrt(),
+ "25.sqrt()",
+ "SQRT(25)",
+ "5.0")
+
+ testAllApis(
+ 2.2.sqrt(),
+ "2.2.sqrt()",
+ "POWER(CAST(2.2 AS DOUBLE), CAST(0.5 AS DOUBLE))", // TODO fix FLINK-4621
+ math.sqrt(2.2).toString)
+ }
+
+ @Test
+ def testLn(): Unit = {
+ testAllApis(
+ 'f2.ln(),
+ "f2.ln()",
+ "LN(f2)",
+ math.log(42.toByte).toString)
+
+ testAllApis(
+ 'f3.ln(),
+ "f3.ln()",
+ "LN(f3)",
+ math.log(43.toShort).toString)
+
+ testAllApis(
+ 'f4.ln(),
+ "f4.ln()",
+ "LN(f4)",
+ math.log(44.toLong).toString)
+
+ testAllApis(
+ 'f5.ln(),
+ "f5.ln()",
+ "LN(f5)",
+ math.log(4.5.toFloat).toString)
+
+ testAllApis(
+ 'f6.ln(),
+ "f6.ln()",
+ "LN(f6)",
+ math.log(4.6).toString)
+ }
+
+ @Test
+ def testAbs(): Unit = {
+ testAllApis(
+ 'f2.abs(),
+ "f2.abs()",
+ "ABS(f2)",
+ "42")
+
+ testAllApis(
+ 'f3.abs(),
+ "f3.abs()",
+ "ABS(f3)",
+ "43")
+
+ testAllApis(
+ 'f4.abs(),
+ "f4.abs()",
+ "ABS(f4)",
+ "44")
+
+ testAllApis(
+ 'f5.abs(),
+ "f5.abs()",
+ "ABS(f5)",
+ "4.5")
+
+ testAllApis(
+ 'f6.abs(),
+ "f6.abs()",
+ "ABS(f6)",
+ "4.6")
+
+ testAllApis(
+ 'f9.abs(),
+ "f9.abs()",
+ "ABS(f9)",
+ "42")
+
+ testAllApis(
+ 'f10.abs(),
+ "f10.abs()",
+ "ABS(f10)",
+ "43")
+
+ testAllApis(
+ 'f11.abs(),
+ "f11.abs()",
+ "ABS(f11)",
+ "44")
+
+ testAllApis(
+ 'f12.abs(),
+ "f12.abs()",
+ "ABS(f12)",
+ "4.5")
+
+ testAllApis(
+ 'f13.abs(),
+ "f13.abs()",
+ "ABS(f13)",
+ "4.6")
+
+ testAllApis(
+ 'f15.abs(),
+ "f15.abs()",
+ "ABS(f15)",
+ "1231.1231231321321321111")
+ }
+
+ @Test
+ def testArithmeticFloorCeil(): Unit = {
+ testAllApis(
+ 'f5.floor(),
+ "f5.floor()",
+ "FLOOR(f5)",
+ "4.0")
+
+ testAllApis(
+ 'f5.ceil(),
+ "f5.ceil()",
+ "CEIL(f5)",
+ "5.0")
+
+ testAllApis(
+ 'f3.floor(),
+ "f3.floor()",
+ "FLOOR(f3)",
+ "43")
+
+ testAllApis(
+ 'f3.ceil(),
+ "f3.ceil()",
+ "CEIL(f3)",
+ "43")
+
+ testAllApis(
+ 'f15.floor(),
+ "f15.floor()",
+ "FLOOR(f15)",
+ "-1232")
+
+ testAllApis(
+ 'f15.ceil(),
+ "f15.ceil()",
+ "CEIL(f15)",
+ "-1231")
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // Temporal functions
+ // ----------------------------------------------------------------------------------------------
+
+ @Test
+ def testExtract(): Unit = {
+ testAllApis(
+ 'f16.extract(TimeIntervalUnit.YEAR),
+ "f16.extract(YEAR)",
+ "EXTRACT(YEAR FROM f16)",
+ "1996")
+
+ testAllApis(
+ 'f16.extract(TimeIntervalUnit.MONTH),
+ "extract(f16, MONTH)",
+ "EXTRACT(MONTH FROM f16)",
+ "11")
+
+ testAllApis(
+ 'f16.extract(TimeIntervalUnit.DAY),
+ "f16.extract(DAY)",
+ "EXTRACT(DAY FROM f16)",
+ "10")
+
+ testAllApis(
+ 'f18.extract(TimeIntervalUnit.YEAR),
+ "f18.extract(YEAR)",
+ "EXTRACT(YEAR FROM f18)",
+ "1996")
+
+ testAllApis(
+ 'f18.extract(TimeIntervalUnit.MONTH),
+ "f18.extract(MONTH)",
+ "EXTRACT(MONTH FROM f18)",
+ "11")
+
+ testAllApis(
+ 'f18.extract(TimeIntervalUnit.DAY),
+ "f18.extract(DAY)",
+ "EXTRACT(DAY FROM f18)",
+ "10")
+
+ testAllApis(
+ 'f18.extract(TimeIntervalUnit.HOUR),
+ "f18.extract(HOUR)",
+ "EXTRACT(HOUR FROM f18)",
+ "6")
+
+ testAllApis(
+ 'f17.extract(TimeIntervalUnit.HOUR),
+ "f17.extract(HOUR)",
+ "EXTRACT(HOUR FROM f17)",
+ "6")
+
+ testAllApis(
+ 'f18.extract(TimeIntervalUnit.MINUTE),
+ "f18.extract(MINUTE)",
+ "EXTRACT(MINUTE FROM f18)",
+ "55")
+
+ testAllApis(
+ 'f17.extract(TimeIntervalUnit.MINUTE),
+ "f17.extract(MINUTE)",
+ "EXTRACT(MINUTE FROM f17)",
+ "55")
+
+ testAllApis(
+ 'f18.extract(TimeIntervalUnit.SECOND),
+ "f18.extract(SECOND)",
+ "EXTRACT(SECOND FROM f18)",
+ "44")
+
+ testAllApis(
+ 'f17.extract(TimeIntervalUnit.SECOND),
+ "f17.extract(SECOND)",
+ "EXTRACT(SECOND FROM f17)",
+ "44")
+
+ testAllApis(
+ 'f19.extract(TimeIntervalUnit.DAY),
+ "f19.extract(DAY)",
+ "EXTRACT(DAY FROM f19)",
+ "16979")
+
+ testAllApis(
+ 'f19.extract(TimeIntervalUnit.HOUR),
+ "f19.extract(HOUR)",
+ "EXTRACT(HOUR FROM f19)",
+ "7")
+
+ testAllApis(
+ 'f19.extract(TimeIntervalUnit.MINUTE),
+ "f19.extract(MINUTE)",
+ "EXTRACT(MINUTE FROM f19)",
+ "23")
+
+ testAllApis(
+ 'f19.extract(TimeIntervalUnit.SECOND),
+ "f19.extract(SECOND)",
+ "EXTRACT(SECOND FROM f19)",
+ "33")
+
+ testAllApis(
+ 'f20.extract(TimeIntervalUnit.MONTH),
+ "f20.extract(MONTH)",
+ "EXTRACT(MONTH FROM f20)",
+ "1")
+
+ testAllApis(
+ 'f20.extract(TimeIntervalUnit.YEAR),
+ "f20.extract(YEAR)",
+ "EXTRACT(YEAR FROM f20)",
+ "2")
+ }
+
+ @Test
+ def testTemporalFloor(): Unit = {
+ testAllApis(
+ 'f18.floor(TimeIntervalUnit.YEAR),
+ "f18.floor(YEAR)",
+ "FLOOR(f18 TO YEAR)",
+ "1996-01-01 00:00:00.0")
+
+ testAllApis(
+ 'f18.floor(TimeIntervalUnit.MONTH),
+ "f18.floor(MONTH)",
+ "FLOOR(f18 TO MONTH)",
+ "1996-11-01 00:00:00.0")
+
+ testAllApis(
+ 'f18.floor(TimeIntervalUnit.DAY),
+ "f18.floor(DAY)",
+ "FLOOR(f18 TO DAY)",
+ "1996-11-10 00:00:00.0")
+
+ testAllApis(
+ 'f18.floor(TimeIntervalUnit.MINUTE),
+ "f18.floor(MINUTE)",
+ "FLOOR(f18 TO MINUTE)",
+ "1996-11-10 06:55:00.0")
+
+ testAllApis(
+ 'f18.floor(TimeIntervalUnit.SECOND),
+ "f18.floor(SECOND)",
+ "FLOOR(f18 TO SECOND)",
+ "1996-11-10 06:55:44.0")
+
+ testAllApis(
+ 'f17.floor(TimeIntervalUnit.HOUR),
+ "f17.floor(HOUR)",
+ "FLOOR(f17 TO HOUR)",
+ "06:00:00")
+
+ testAllApis(
+ 'f17.floor(TimeIntervalUnit.MINUTE),
+ "f17.floor(MINUTE)",
+ "FLOOR(f17 TO MINUTE)",
+ "06:55:00")
+
+ testAllApis(
+ 'f17.floor(TimeIntervalUnit.SECOND),
+ "f17.floor(SECOND)",
+ "FLOOR(f17 TO SECOND)",
+ "06:55:44")
+
+ testAllApis(
+ 'f16.floor(TimeIntervalUnit.YEAR),
+ "f16.floor(YEAR)",
+ "FLOOR(f16 TO YEAR)",
+ "1996-01-01")
+
+ testAllApis(
+ 'f16.floor(TimeIntervalUnit.MONTH),
+ "f16.floor(MONTH)",
+ "FLOOR(f16 TO MONTH)",
+ "1996-11-01")
+
+ testAllApis(
+ 'f18.ceil(TimeIntervalUnit.YEAR),
+ "f18.ceil(YEAR)",
+ "CEIL(f18 TO YEAR)",
+ "1997-01-01 00:00:00.0")
+
+ testAllApis(
+ 'f18.ceil(TimeIntervalUnit.MONTH),
+ "f18.ceil(MONTH)",
+ "CEIL(f18 TO MONTH)",
+ "1996-12-01 00:00:00.0")
+
+ testAllApis(
+ 'f18.ceil(TimeIntervalUnit.DAY),
+ "f18.ceil(DAY)",
+ "CEIL(f18 TO DAY)",
+ "1996-11-11 00:00:00.0")
+
+ testAllApis(
+ 'f18.ceil(TimeIntervalUnit.MINUTE),
+ "f18.ceil(MINUTE)",
+ "CEIL(f18 TO MINUTE)",
+ "1996-11-10 06:56:00.0")
+
+ testAllApis(
+ 'f18.ceil(TimeIntervalUnit.SECOND),
+ "f18.ceil(SECOND)",
+ "CEIL(f18 TO SECOND)",
+ "1996-11-10 06:55:45.0")
+
+ testAllApis(
+ 'f17.ceil(TimeIntervalUnit.HOUR),
+ "f17.ceil(HOUR)",
+ "CEIL(f17 TO HOUR)",
+ "07:00:00")
+
+ testAllApis(
+ 'f17.ceil(TimeIntervalUnit.MINUTE),
+ "f17.ceil(MINUTE)",
+ "CEIL(f17 TO MINUTE)",
+ "06:56:00")
+
+ testAllApis(
+ 'f17.ceil(TimeIntervalUnit.SECOND),
+ "f17.ceil(SECOND)",
+ "CEIL(f17 TO SECOND)",
+ "06:55:44")
+
+ testAllApis(
+ 'f16.ceil(TimeIntervalUnit.YEAR),
+ "f16.ceil(YEAR)",
+ "CEIL(f16 TO YEAR)",
+ "1996-01-01")
+
+ testAllApis(
+ 'f16.ceil(TimeIntervalUnit.MONTH),
+ "f16.ceil(MONTH)",
+ "CEIL(f16 TO MONTH)",
+ "1996-11-01")
+ }
+
+ @Test
+ def testCurrentTimePoint(): Unit = {
+
+ // current time points are non-deterministic
+ // we just test the format of the output
+ // manual test can be found in NonDeterministicTests
+
+ testAllApis(
+ currentDate().cast(Types.STRING).charLength() >= 5,
+ "currentDate().cast(STRING).charLength() >= 5",
+ "CHAR_LENGTH(CAST(CURRENT_DATE AS VARCHAR)) >= 5",
+ "true")
+
+ testAllApis(
+ currentTime().cast(Types.STRING).charLength() >= 5,
+ "currentTime().cast(STRING).charLength() >= 5",
+ "CHAR_LENGTH(CAST(CURRENT_TIME AS VARCHAR)) >= 5",
+ "true")
+
+ testAllApis(
+ currentTimestamp().cast(Types.STRING).charLength() >= 12,
+ "currentTimestamp().cast(STRING).charLength() >= 12",
+ "CHAR_LENGTH(CAST(CURRENT_TIMESTAMP AS VARCHAR)) >= 12",
+ "true")
+
+ testAllApis(
+ localTimestamp().cast(Types.STRING).charLength() >= 12,
+ "localTimestamp().cast(STRING).charLength() >= 12",
+ "CHAR_LENGTH(CAST(LOCALTIMESTAMP AS VARCHAR)) >= 12",
+ "true")
+
+ testAllApis(
+ localTime().cast(Types.STRING).charLength() >= 5,
+ "localTime().cast(STRING).charLength() >= 5",
+ "CHAR_LENGTH(CAST(LOCALTIME AS VARCHAR)) >= 5",
+ "true")
+
+ // comparisons are deterministic
+ testAllApis(
+ localTimestamp() === localTimestamp(),
+ "localTimestamp() === localTimestamp()",
+ "LOCALTIMESTAMP = LOCALTIMESTAMP",
+ "true")
+ }
+
+ @Test
+ def testOverlaps(): Unit = {
+ testAllApis(
+ temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hours),
+ "temporalOverlaps('2:55:00'.toTime, 1.hour, '3:30:00'.toTime, 2.hours)",
+ "(TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR)",
+ "true")
+
+ testAllApis(
+ temporalOverlaps("9:00:00".toTime, "9:30:00".toTime, "9:29:00".toTime, "9:31:00".toTime),
+ "temporalOverlaps('9:00:00'.toTime, '9:30:00'.toTime, '9:29:00'.toTime, '9:31:00'.toTime)",
+ "(TIME '9:00:00', TIME '9:30:00') OVERLAPS (TIME '9:29:00', TIME '9:31:00')",
+ "true")
+
+ testAllApis(
+ temporalOverlaps("9:00:00".toTime, "10:00:00".toTime, "10:15:00".toTime, 3.hours),
+ "temporalOverlaps('9:00:00'.toTime, '10:00:00'.toTime, '10:15:00'.toTime, 3.hours)",
+ "(TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR)",
+ "false")
+
+ testAllApis(
+ temporalOverlaps("2011-03-10".toDate, 10.days, "2011-03-19".toDate, 10.days),
+ "temporalOverlaps('2011-03-10'.toDate, 10.days, '2011-03-19'.toDate, 10.days)",
+ "(DATE '2011-03-10', INTERVAL '10' DAY) OVERLAPS (DATE '2011-03-19', INTERVAL '10' DAY)",
+ "true")
+
+ testAllApis(
+ temporalOverlaps("2011-03-10 05:02:02".toTimestamp, 0.milli,
+ "2011-03-10 05:02:02".toTimestamp, "2011-03-10 05:02:01".toTimestamp),
+ "temporalOverlaps('2011-03-10 05:02:02'.toTimestamp, 0.milli, " +
+ "'2011-03-10 05:02:02'.toTimestamp, '2011-03-10 05:02:01'.toTimestamp)",
+ "(TIMESTAMP '2011-03-10 05:02:02', INTERVAL '0' SECOND) OVERLAPS " +
+ "(TIMESTAMP '2011-03-10 05:02:02', TIMESTAMP '2011-03-10 05:02:01')",
+ "false")
+
+ // TODO enable once CALCITE-1435 is fixed
+ // comparison of timestamps based on milliseconds is buggy
+ //testAllApis(
+ // temporalOverlaps("2011-03-10 02:02:02.001".toTimestamp, 0.milli,
+ // "2011-03-10 02:02:02.002".toTimestamp, "2011-03-10 02:02:02.002".toTimestamp),
+ // "temporalOverlaps('2011-03-10 02:02:02.001'.toTimestamp, 0.milli, " +
+ // "'2011-03-10 02:02:02.002'.toTimestamp, '2011-03-10 02:02:02.002'.toTimestamp)",
+ // "(TIMESTAMP '2011-03-10 02:02:02.001', INTERVAL '0' SECOND) OVERLAPS " +
+ // "(TIMESTAMP '2011-03-10 02:02:02.002', TIMESTAMP '2011-03-10 02:02:02.002')",
+ // "false")
+ }
+
+ @Test
+ def testQuarter(): Unit = {
+ testAllApis(
+ "1997-01-27".toDate.quarter(),
+ "'1997-01-27'.toDate.quarter()",
+ "QUARTER(DATE '1997-01-27')",
+ "1")
+
+ testAllApis(
+ "1997-04-27".toDate.quarter(),
+ "'1997-04-27'.toDate.quarter()",
+ "QUARTER(DATE '1997-04-27')",
+ "2")
+
+ testAllApis(
+ "1997-12-31".toDate.quarter(),
+ "'1997-12-31'.toDate.quarter()",
+ "QUARTER(DATE '1997-12-31')",
+ "4")
+ }
+
+ // ----------------------------------------------------------------------------------------------
+ // Other functions
+ // ----------------------------------------------------------------------------------------------
+
+ @Test
+ def testIsTrueIsFalse(): Unit = {
+ testAllApis(
+ 'f1.isTrue,
+ "f1.isTrue",
+ "f1 IS TRUE",
+ "true")
+
+ testAllApis(
+ 'f21.isTrue,
+ "f21.isTrue",
+ "f21 IS TRUE",
+ "false")
+
+ testAllApis(
+ false.isFalse,
+ "false.isFalse",
+ "FALSE IS FALSE",
+ "true")
+
+ testAllApis(
+ 'f21.isFalse,
+ "f21.isFalse",
+ "f21 IS FALSE",
+ "false")
+
+ testAllApis(
+ 'f1.isNotTrue,
+ "f1.isNotTrue",
+ "f1 IS NOT TRUE",
+ "false")
+
+ testAllApis(
+ 'f21.isNotTrue,
+ "f21.isNotTrue",
+ "f21 IS NOT TRUE",
+ "true")
+
+ testAllApis(
+ false.isNotFalse,
+ "false.isNotFalse",
+ "FALSE IS NOT FALSE",
+ "false")
+
+ testAllApis(
+ 'f21.isNotFalse,
+ "f21.isNotFalse",
+ "f21 IS NOT FALSE",
+ "true")
+ }
+
+ // ----------------------------------------------------------------------------------------------
+
+ def testData = {
+ val testData = new Row(25)
+ testData.setField(0, "This is a test String.")
+ testData.setField(1, true)
+ testData.setField(2, 42.toByte)
+ testData.setField(3, 43.toShort)
+ testData.setField(4, 44.toLong)
+ testData.setField(5, 4.5.toFloat)
+ testData.setField(6, 4.6)
+ testData.setField(7, 3)
+ testData.setField(8, " This is a test String. ")
+ testData.setField(9, -42.toByte)
+ testData.setField(10, -43.toShort)
+ testData.setField(11, -44.toLong)
+ testData.setField(12, -4.5.toFloat)
+ testData.setField(13, -4.6)
+ testData.setField(14, -3)
+ testData.setField(15, BigDecimal("-1231.1231231321321321111").bigDecimal)
+ testData.setField(16, Date.valueOf("1996-11-10"))
+ testData.setField(17, Time.valueOf("06:55:44"))
+ testData.setField(18, Timestamp.valueOf("1996-11-10 06:55:44.333"))
+ testData.setField(19, 1467012213000L) // +16979 07:23:33.000
+ testData.setField(20, 25) // +2-01
+ testData.setField(21, null)
+ testData.setField(22, BigDecimal("2").bigDecimal)
+ testData.setField(23, "%This is a test String.")
+ testData.setField(24, "*_This is a test String.")
+ testData
+ }
+
+ def typeInfo = {
+ new RowTypeInfo(
+ Types.STRING,
+ Types.BOOLEAN,
+ Types.BYTE,
+ Types.SHORT,
+ Types.LONG,
+ Types.FLOAT,
+ Types.DOUBLE,
+ Types.INT,
+ Types.STRING,
+ Types.BYTE,
+ Types.SHORT,
+ Types.LONG,
+ Types.FLOAT,
+ Types.DOUBLE,
+ Types.INT,
+ Types.DECIMAL,
+ Types.DATE,
+ Types.TIME,
+ Types.TIMESTAMP,
+ Types.INTERVAL_MILLIS,
+ Types.INTERVAL_MONTHS,
+ Types.BOOLEAN,
+ Types.DECIMAL,
+ Types.STRING,
+ Types.STRING).asInstanceOf[TypeInformation[Any]]
+
+ }
+}