You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:32 UTC

[03/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
new file mode 100644
index 0000000..f35ee76
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnionITCase.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class UnionITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testUnion(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'd, 'e, 'f)
+
+    val unionDs = ds1.unionAll(ds2).select('c)
+
+    val results = unionDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "Hi", "Hello", "Hello world", "Hi", "Hello", "Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testUnionWithFilter(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
+
+    val unionDs = ds1.unionAll(ds2.select('a, 'b, 'c)).filter('b < 2).select('c)
+
+    val results = unionDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("Hi", "Hallo")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionFieldsNameNotOverlap1(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
+
+    val unionDs = ds1.unionAll(ds2)
+
+    val results = unionDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    assertEquals(true, StreamITCase.testResults.isEmpty)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionFieldsNameNotOverlap2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = StreamTestData.get5TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    val unionDs = ds1.unionAll(ds2)
+
+    val results = unionDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    println(StreamITCase.testResults)
+    assertEquals(true, StreamITCase.testResults.isEmpty)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionTablesFromDifferentEnvs(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.unionAll(ds2)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
new file mode 100644
index 0000000..b6a6660
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UnsupportedOpsTest.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.table.api.scala.stream.utils.StreamTestData
+import org.apache.flink.table.api.scala._
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.junit.Test
+
+class UnsupportedOpsTest extends StreamingMultipleProgramsTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectWithAggregation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1.min)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testDistinct(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).distinct()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSort(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).orderBy('_1.desc)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoin(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.join(t2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnion(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.union(t2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersect(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.intersect(t2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersectAll(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.intersectAll(t2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinus(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.minus(t2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinusAll(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    val t2 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.minusAll(t2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testLimit(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv)
+    t1.limit(0,5)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
new file mode 100644
index 0000000..168f9ec
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/UserDefinedTableFunctionTest.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api._
+import org.apache.flink.table.expressions.utils._
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils._
+import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
+import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaExecutionEnv}
+import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment => ScalaExecutionEnv}
+import org.junit.Assert.{assertTrue, fail}
+import org.junit.Test
+import org.mockito.Mockito._
+
+class UserDefinedTableFunctionTest extends TableTestBase {
+
+  @Test
+  def testJavaScalaTableAPIEquality(): Unit = {
+    // mock
+    val ds = mock(classOf[DataStream[Row]])
+    val jDs = mock(classOf[JDataStream[Row]])
+    val typeInfo = new RowTypeInfo(Seq(Types.INT, Types.LONG, Types.STRING): _*)
+    when(ds.javaStream).thenReturn(jDs)
+    when(jDs.getType).thenReturn(typeInfo)
+
+    // Scala environment
+    val env = mock(classOf[ScalaExecutionEnv])
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val in1 = ds.toTable(tableEnv).as('a, 'b, 'c)
+
+    // Java environment
+    val javaEnv = mock(classOf[JavaExecutionEnv])
+    val javaTableEnv = TableEnvironment.getTableEnvironment(javaEnv)
+    val in2 = javaTableEnv.fromDataStream(jDs).as("a, b, c")
+
+    // test cross join
+    val func1 = new TableFunc1
+    javaTableEnv.registerFunction("func1", func1)
+    var scalaTable = in1.join(func1('c) as 's).select('c, 's)
+    var javaTable = in2.join("func1(c).as(s)").select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test left outer join
+    scalaTable = in1.leftOuterJoin(func1('c) as 's).select('c, 's)
+    javaTable = in2.leftOuterJoin("as(func1(c), s)").select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test overloading
+    scalaTable = in1.join(func1('c, "$") as 's).select('c, 's)
+    javaTable = in2.join("func1(c, '$') as (s)").select("c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test custom result type
+    val func2 = new TableFunc2
+    javaTableEnv.registerFunction("func2", func2)
+    scalaTable = in1.join(func2('c) as ('name, 'len)).select('c, 'name, 'len)
+    javaTable = in2.join("func2(c).as(name, len)").select("c, name, len")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test hierarchy generic type
+    val hierarchy = new HierarchyTableFunction
+    javaTableEnv.registerFunction("hierarchy", hierarchy)
+    scalaTable = in1.join(hierarchy('c) as ('name, 'adult, 'len))
+      .select('c, 'name, 'len, 'adult)
+    javaTable = in2.join("AS(hierarchy(c), name, adult, len)")
+      .select("c, name, len, adult")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test pojo type
+    val pojo = new PojoTableFunc
+    javaTableEnv.registerFunction("pojo", pojo)
+    scalaTable = in1.join(pojo('c))
+      .select('c, 'name, 'age)
+    javaTable = in2.join("pojo(c)")
+      .select("c, name, age")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with filter
+    scalaTable = in1.join(func2('c) as ('name, 'len))
+      .select('c, 'name, 'len).filter('len > 2)
+    javaTable = in2.join("func2(c) as (name, len)")
+      .select("c, name, len").filter("len > 2")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // test with scalar function
+    scalaTable = in1.join(func1('c.substring(2)) as 's)
+      .select('a, 'c, 's)
+    javaTable = in2.join("func1(substring(c, 2)) as (s)")
+      .select("a, c, s")
+    verifyTableEquals(scalaTable, javaTable)
+
+    // check scala object is forbidden
+    expectExceptionThrown(
+      tableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
+    expectExceptionThrown(
+      javaTableEnv.registerFunction("func3", ObjectTableFunction), "Scala object")
+    expectExceptionThrown(
+      in1.join(ObjectTableFunction('a, 1)), "Scala object")
+
+  }
+
+  @Test
+  def testInvalidTableFunction(): Unit = {
+    // mock
+    val util = streamTestUtil()
+    val t = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val tEnv = TableEnvironment.getTableEnvironment(mock(classOf[JavaExecutionEnv]))
+
+    //=================== check scala object is forbidden =====================
+    // Scala table environment register
+    expectExceptionThrown(util.addFunction("udtf", ObjectTableFunction), "Scala object")
+    // Java table environment register
+    expectExceptionThrown(tEnv.registerFunction("udtf", ObjectTableFunction), "Scala object")
+    // Scala Table API directly call
+    expectExceptionThrown(t.join(ObjectTableFunction('a, 1)), "Scala object")
+
+
+    //============ throw exception when table function is not registered =========
+    // Java Table API call
+    expectExceptionThrown(t.join("nonexist(a)"), "Undefined function: NONEXIST")
+    // SQL API call
+    expectExceptionThrown(
+      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(nonexist(a))"),
+      "No match found for function signature nonexist(<NUMERIC>)")
+
+
+    //========= throw exception when the called function is a scalar function ====
+    util.addFunction("func0", Func0)
+    // Java Table API call
+    expectExceptionThrown(
+      t.join("func0(a)"),
+      "only accept expressions that define table functions",
+      classOf[TableException])
+    // SQL API call
+    // NOTE: it doesn't throw an exception but an AssertionError, maybe a Calcite bug
+    expectExceptionThrown(
+      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func0(a))"),
+      null,
+      classOf[AssertionError])
+
+    //========== throw exception when the parameters is not correct ===============
+    // Java Table API call
+    util.addFunction("func2", new TableFunc2)
+    expectExceptionThrown(
+      t.join("func2(c, c)"),
+      "Given parameters of function 'FUNC2' do not match any signature")
+    // SQL API call
+    expectExceptionThrown(
+      util.tEnv.sql("SELECT * FROM MyTable, LATERAL TABLE(func2(c, c))"),
+      "No match found for function signature func2(<CHARACTER>, <CHARACTER>)")
+  }
+
+  @Test
+  def testCrossJoin(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result1 = table.join(function('c) as 's).select('c, 's)
+
+    val expected1 = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"$function($$2)"),
+        term("function", function),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result1, expected1)
+
+    // test overloading
+
+    val result2 = table.join(function('c, "$") as 's).select('c, 's)
+
+    val expected2 = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"$function($$2, '$$')"),
+        term("function", function),
+        term("rowType",
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result2, expected2)
+  }
+
+  @Test
+  def testLeftOuterJoin(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result = table.leftOuterJoin(function('c) as 's).select('c, 's)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"$function($$2)"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "LEFT")
+      ),
+      term("select", "c", "s")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testCustomType(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func2", new TableFunc2)
+
+    val result = table.join(function('c) as ('name, 'len)).select('c, 'name, 'len)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"$function($$2)"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+           "VARCHAR(2147483647) name, INTEGER len)"),
+        term("joinType", "INNER")
+      ),
+      term("select", "c", "name", "len")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testHierarchyType(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("hierarchy", new HierarchyTableFunction)
+
+    val result = table.join(function('c) as ('name, 'adult, 'len))
+
+    val expected = unaryNode(
+      "DataStreamCorrelate",
+      streamTableNode(0),
+      term("invocation", s"$function($$2)"),
+      term("function", function),
+      term("rowType",
+        "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
+        " VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)"),
+      term("joinType", "INNER")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testPojoType(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("pojo", new PojoTableFunc)
+
+    val result = table.join(function('c))
+
+    val expected = unaryNode(
+      "DataStreamCorrelate",
+      streamTableNode(0),
+      term("invocation", s"$function($$2)"),
+      term("function", function),
+      term("rowType",
+        "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+         "INTEGER age, VARCHAR(2147483647) name)"),
+      term("joinType", "INNER")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testFilter(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func2", new TableFunc2)
+
+    val result = table
+      .join(function('c) as ('name, 'len))
+      .select('c, 'name, 'len)
+      .filter('len > 2)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation", s"$function($$2)"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
+          "VARCHAR(2147483647) name, INTEGER len)"),
+        term("joinType", "INNER"),
+        term("condition", ">($1, 2)")
+      ),
+      term("select", "c", "name", "len")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testScalarFunction(): Unit = {
+    val util = streamTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+    val function = util.addFunction("func1", new TableFunc1)
+
+    val result = table.join(function('c.substring(2)) as 's)
+
+    val expected = unaryNode(
+        "DataStreamCorrelate",
+        streamTableNode(0),
+        term("invocation",  s"$function(SUBSTRING($$2, 2, CHAR_LENGTH($$2)))"),
+        term("function", function),
+        term("rowType",
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, VARCHAR(2147483647) s)"),
+        term("joinType", "INNER")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  private def expectExceptionThrown(
+      function: => Unit,
+      keywords: String,
+      clazz: Class[_ <: Throwable] = classOf[ValidationException])
+    : Unit = {
+    try {
+      function
+      fail(s"Expected a $clazz, but no exception is thrown.")
+    } catch {
+      case e if e.getClass == clazz =>
+        if (keywords != null) {
+          assertTrue(
+            s"The exception message '${e.getMessage}' doesn't contain keyword '$keywords'",
+            e.getMessage.contains(keywords))
+        }
+      case e: Throwable => fail(s"Expected throw ${clazz.getSimpleName}, but is $e.")
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
new file mode 100644
index 0000000..f826bba
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamITCase.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.utils
+
+import java.util.Collections
+
+import org.apache.flink.types.Row
+import org.junit.Assert._
+
+import scala.collection.mutable
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+
+import scala.collection.JavaConverters._
+
+object StreamITCase {
+
+  var testResults = mutable.MutableList.empty[String]
+
+  def clear = {
+    StreamITCase.testResults.clear()
+  }
+
+  def compareWithList(expected: java.util.List[String]): Unit = {
+    Collections.sort(expected)
+    assertEquals(expected.asScala, StreamITCase.testResults.sorted)
+  }
+
+  final class StringSink extends RichSinkFunction[Row]() {
+    def invoke(value: Row) {
+      testResults.synchronized {
+        testResults += value.toString 
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala
new file mode 100644
index 0000000..6745039
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/utils/StreamTestData.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.stream.utils
+
+import org.apache.flink.api.scala._
+import scala.collection.mutable
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+
+object StreamTestData {
+
+  def getSmall3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = {
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Hi"))
+    data.+=((2, 2L, "Hello"))
+    data.+=((3, 2L, "Hello world"))
+    env.fromCollection(data)
+  }
+
+  def get3TupleDataStream(env: StreamExecutionEnvironment): DataStream[(Int, Long, String)] = {
+    val data = new mutable.MutableList[(Int, Long, String)]
+    data.+=((1, 1L, "Hi"))
+    data.+=((2, 2L, "Hello"))
+    data.+=((3, 2L, "Hello world"))
+    data.+=((4, 3L, "Hello world, how are you?"))
+    data.+=((5, 3L, "I am fine."))
+    data.+=((6, 3L, "Luke Skywalker"))
+    data.+=((7, 4L, "Comment#1"))
+    data.+=((8, 4L, "Comment#2"))
+    data.+=((9, 4L, "Comment#3"))
+    data.+=((10, 4L, "Comment#4"))
+    data.+=((11, 5L, "Comment#5"))
+    data.+=((12, 5L, "Comment#6"))
+    data.+=((13, 5L, "Comment#7"))
+    data.+=((14, 5L, "Comment#8"))
+    data.+=((15, 5L, "Comment#9"))
+    data.+=((16, 6L, "Comment#10"))
+    data.+=((17, 6L, "Comment#11"))
+    data.+=((18, 6L, "Comment#12"))
+    data.+=((19, 6L, "Comment#13"))
+    data.+=((20, 6L, "Comment#14"))
+    data.+=((21, 6L, "Comment#15"))
+    env.fromCollection(data)
+  }
+
+  def get5TupleDataStream(env: StreamExecutionEnvironment):
+      DataStream[(Int, Long, Int, String, Long)] = {
+
+    val data = new mutable.MutableList[(Int, Long, Int, String, Long)]
+    data.+=((1, 1L, 0, "Hallo", 1L))
+    data.+=((2, 2L, 1, "Hallo Welt", 2L))
+    data.+=((2, 3L, 2, "Hallo Welt wie", 1L))
+    data.+=((3, 4L, 3, "Hallo Welt wie gehts?", 2L))
+    data.+=((3, 5L, 4, "ABC", 2L))
+    data.+=((3, 6L, 5, "BCD", 3L))
+    data.+=((4, 7L, 6, "CDE", 2L))
+    data.+=((4, 8L, 7, "DEF", 1L))
+    data.+=((4, 9L, 8, "EFG", 1L))
+    data.+=((4, 10L, 9, "FGH", 2L))
+    data.+=((5, 11L, 10, "GHI", 1L))
+    data.+=((5, 12L, 11, "HIJ", 3L))
+    data.+=((5, 13L, 12, "IJK", 3L))
+    data.+=((5, 14L, 13, "JKL", 2L))
+    data.+=((5, 15L, 14, "KLM", 2L))
+    env.fromCollection(data)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
new file mode 100644
index 0000000..49cf572
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.sql.Date
+
+import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.junit.Test
+
+class ArrayTypeTest extends ExpressionTestBase {
+
+  @Test(expected = classOf[ValidationException])
+  def testObviousInvalidIndexTableApi(): Unit = {
+    testTableApi('f2.at(0), "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testEmptyArraySql(): Unit = {
+    testSqlApi("ARRAY[]", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testEmptyArrayTableApi(): Unit = {
+    testTableApi("FAIL", "array()", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNullArraySql(): Unit = {
+    testSqlApi("ARRAY[NULL]", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testDifferentTypesArraySql(): Unit = {
+    testSqlApi("ARRAY[1, TRUE]", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testDifferentTypesArrayTableApi(): Unit = {
+    testTableApi("FAIL", "array(1, true)", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnsupportedComparison(): Unit = {
+    testAllApis(
+      'f2 <= 'f5.at(1),
+      "f2 <= f5.at(1)",
+      "f2 <= f5[1]",
+      "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testElementNonArray(): Unit = {
+    testTableApi(
+      'f0.element(),
+      "FAIL",
+      "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testElementNonArraySql(): Unit = {
+    testSqlApi(
+      "ELEMENT(f0)",
+      "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testCardinalityOnNonArray(): Unit = {
+    testTableApi('f0.cardinality(), "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testCardinalityOnNonArraySql(): Unit = {
+    testSqlApi("CARDINALITY(f0)", "FAIL")
+  }
+
+  @Test
+  def testArrayLiterals(): Unit = {
+    // primitive literals
+    testAllApis(array(1, 2, 3), "array(1, 2, 3)", "ARRAY[1, 2, 3]", "[1, 2, 3]")
+
+    testAllApis(
+      array(true, true, true),
+      "array(true, true, true)",
+      "ARRAY[TRUE, TRUE, TRUE]",
+      "[true, true, true]")
+
+    // object literals
+    testTableApi(array(BigDecimal(1), BigDecimal(1)), "array(1p, 1p)", "[1, 1]")
+
+    testAllApis(
+      array(array(array(1), array(1))),
+      "array(array(array(1), array(1)))",
+      "ARRAY[ARRAY[ARRAY[1], ARRAY[1]]]",
+      "[[[1], [1]]]")
+
+    testAllApis(
+      array(1 + 1, 3 * 3),
+      "array(1 + 1, 3 * 3)",
+      "ARRAY[1 + 1, 3 * 3]",
+      "[2, 9]")
+
+    testAllApis(
+      array(Null(Types.INT), 1),
+      "array(Null(INT), 1)",
+      "ARRAY[NULLIF(1,1), 1]",
+      "[null, 1]")
+
+    testAllApis(
+      array(array(Null(Types.INT), 1)),
+      "array(array(Null(INT), 1))",
+      "ARRAY[ARRAY[NULLIF(1,1), 1]]",
+      "[[null, 1]]")
+
+    // implicit conversion
+    testTableApi(
+      Array(1, 2, 3),
+      "array(1, 2, 3)",
+      "[1, 2, 3]")
+
+    testTableApi(
+      Array[Integer](1, 2, 3),
+      "array(1, 2, 3)",
+      "[1, 2, 3]")
+
+    testAllApis(
+      Array(Date.valueOf("1985-04-11")),
+      "array('1985-04-11'.toDate)",
+      "ARRAY[DATE '1985-04-11']",
+      "[1985-04-11]")
+
+    testAllApis(
+      Array(BigDecimal(2.0002), BigDecimal(2.0003)),
+      "Array(2.0002p, 2.0003p)",
+      "ARRAY[CAST(2.0002 AS DECIMAL), CAST(2.0003 AS DECIMAL)]",
+      "[2.0002, 2.0003]")
+
+    testAllApis(
+      Array(Array(x = true)),
+      "Array(Array(true))",
+      "ARRAY[ARRAY[TRUE]]",
+      "[[true]]")
+
+    testAllApis(
+      Array(Array(1, 2, 3), Array(3, 2, 1)),
+      "Array(Array(1, 2, 3), Array(3, 2, 1))",
+      "ARRAY[ARRAY[1, 2, 3], ARRAY[3, 2, 1]]",
+      "[[1, 2, 3], [3, 2, 1]]")
+  }
+
+  @Test
+  def testArrayField(): Unit = {
+    testAllApis(
+      array('f0, 'f1),
+      "array(f0, f1)",
+      "ARRAY[f0, f1]",
+      "[null, 42]")
+
+    testAllApis(
+      array('f0, 'f1),
+      "array(f0, f1)",
+      "ARRAY[f0, f1]",
+      "[null, 42]")
+
+    testAllApis(
+      'f2,
+      "f2",
+      "f2",
+      "[1, 2, 3]")
+
+    testAllApis(
+      'f3,
+      "f3",
+      "f3",
+      "[1984-03-12, 1984-02-10]")
+
+    testAllApis(
+      'f5,
+      "f5",
+      "f5",
+      "[[1, 2, 3], null]")
+
+    testAllApis(
+      'f6,
+      "f6",
+      "f6",
+      "[1, null, null, 4]")
+
+    testAllApis(
+      'f2,
+      "f2",
+      "f2",
+      "[1, 2, 3]")
+
+    testAllApis(
+      'f2.at(1),
+      "f2.at(1)",
+      "f2[1]",
+      "1")
+
+    testAllApis(
+      'f3.at(1),
+      "f3.at(1)",
+      "f3[1]",
+      "1984-03-12")
+
+    testAllApis(
+      'f3.at(2),
+      "f3.at(2)",
+      "f3[2]",
+      "1984-02-10")
+
+    testAllApis(
+      'f5.at(1).at(2),
+      "f5.at(1).at(2)",
+      "f5[1][2]",
+      "2")
+
+    testAllApis(
+      'f5.at(2).at(2),
+      "f5.at(2).at(2)",
+      "f5[2][2]",
+      "null")
+
+    testAllApis(
+      'f4.at(2).at(2),
+      "f4.at(2).at(2)",
+      "f4[2][2]",
+      "null")
+  }
+
+  @Test
+  def testArrayOperations(): Unit = {
+    // cardinality
+    testAllApis(
+      'f2.cardinality(),
+      "f2.cardinality()",
+      "CARDINALITY(f2)",
+      "3")
+
+    testAllApis(
+      'f4.cardinality(),
+      "f4.cardinality()",
+      "CARDINALITY(f4)",
+      "null")
+
+    // element
+    testAllApis(
+      'f9.element(),
+      "f9.element()",
+      "ELEMENT(f9)",
+      "1")
+
+    testAllApis(
+      'f8.element(),
+      "f8.element()",
+      "ELEMENT(f8)",
+      "4.0")
+
+    testAllApis(
+      'f10.element(),
+      "f10.element()",
+      "ELEMENT(f10)",
+      "null")
+
+    testAllApis(
+      'f4.element(),
+      "f4.element()",
+      "ELEMENT(f4)",
+      "null")
+
+    // comparison
+    testAllApis(
+      'f2 === 'f5.at(1),
+      "f2 === f5.at(1)",
+      "f2 = f5[1]",
+      "true")
+
+    testAllApis(
+      'f6 === array(1, 2, 3),
+      "f6 === array(1, 2, 3)",
+      "f6 = ARRAY[1, 2, 3]",
+      "false")
+
+    testAllApis(
+      'f2 !== 'f5.at(1),
+      "f2 !== f5.at(1)",
+      "f2 <> f5[1]",
+      "false")
+
+    testAllApis(
+      'f2 === 'f7,
+      "f2 === f7",
+      "f2 = f7",
+      "false")
+
+    testAllApis(
+      'f2 !== 'f7,
+      "f2 !== f7",
+      "f2 <> f7",
+      "true")
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  case class MyCaseClass(string: String, int: Int)
+
+  override def testData: Any = {
+    val testData = new Row(11)
+    testData.setField(0, null)
+    testData.setField(1, 42)
+    testData.setField(2, Array(1, 2, 3))
+    testData.setField(3, Array(Date.valueOf("1984-03-12"), Date.valueOf("1984-02-10")))
+    testData.setField(4, null)
+    testData.setField(5, Array(Array(1, 2, 3), null))
+    testData.setField(6, Array[Integer](1, null, null, 4))
+    testData.setField(7, Array(1, 2, 3, 4))
+    testData.setField(8, Array(4.0))
+    testData.setField(9, Array[Integer](1))
+    testData.setField(10, Array[Integer]())
+    testData
+  }
+
+  override def typeInfo: TypeInformation[Any] = {
+    new RowTypeInfo(
+      Types.INT,
+      Types.INT,
+      PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
+      ObjectArrayTypeInfo.getInfoFor(Types.DATE),
+      ObjectArrayTypeInfo.getInfoFor(ObjectArrayTypeInfo.getInfoFor(Types.INT)),
+      ObjectArrayTypeInfo.getInfoFor(PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO),
+      ObjectArrayTypeInfo.getInfoFor(Types.INT),
+      PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
+      PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
+      ObjectArrayTypeInfo.getInfoFor(Types.INT),
+      ObjectArrayTypeInfo.getInfoFor(Types.INT)
+    ).asInstanceOf[TypeInformation[Any]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
new file mode 100644
index 0000000..2025880
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.CompositeAccessTest.{MyCaseClass, MyCaseClass2, MyPojo}
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.junit.Test
+
+
+class CompositeAccessTest extends ExpressionTestBase {
+
+  @Test
+  def testGetField(): Unit = {
+
+    // single field by string key
+    testAllApis(
+      'f0.get("intField"),
+      "f0.get('intField')",
+      "testTable.f0.intField",
+      "42")
+
+    testSqlApi("testTable.f0.stringField", "Bob")
+
+    testSqlApi("testTable.f0.booleanField", "true")
+
+    // single field by int key
+    testTableApi(
+      'f0.get(0),
+      "f0.get(0)",
+      "42")
+
+    // nested single field
+    testAllApis(
+      'f1.get("objectField").get("intField"),
+      "f1.get('objectField').get('intField')",
+      "testTable.f1.objectField.intField",
+      "25")
+
+    testSqlApi("testTable.f1.objectField.stringField", "Timo")
+
+    testSqlApi("testTable.f1.objectField.booleanField", "false")
+
+    testAllApis(
+      'f2.get(0),
+      "f2.get(0)",
+      "testTable.f2._1",
+      "a")
+
+    testSqlApi("testTable.f3.f1", "b")
+
+    testSqlApi("testTable.f4.myString", "Hello")
+
+    testSqlApi("testTable.f5", "13")
+
+    testAllApis(
+      'f7.get("_1"),
+      "get(f7, '_1')",
+      "testTable.f7._1",
+      "true")
+
+    // composite field return type
+    testSqlApi("testTable.f6", "MyCaseClass2(null)")
+
+    testAllApis(
+      'f1.get("objectField"),
+      "f1.get('objectField')",
+      "testTable.f1.objectField",
+      "MyCaseClass(25,Timo,false)")
+
+    testAllApis(
+      'f0,
+      "f0",
+      "testTable.f0",
+      "MyCaseClass(42,Bob,true)")
+
+    // flattening (test base only returns first column)
+    testAllApis(
+      'f1.get("objectField").flatten(),
+      "f1.get('objectField').flatten()",
+      "testTable.f1.objectField.*",
+      "25")
+
+    testAllApis(
+      'f0.flatten(),
+      "flatten(f0)",
+      "testTable.f0.*",
+      "42")
+
+    testTableApi(12.flatten(), "12.flatten()", "12")
+
+    testTableApi('f5.flatten(), "f5.flatten()", "13")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWrongSqlField(): Unit = {
+    testSqlApi("testTable.f5.test", "13")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWrongIntKeyField(): Unit = {
+    testTableApi('f0.get(555), "'fail'", "fail")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWrongIntKeyField2(): Unit = {
+    testTableApi("fail", "f0.get(555)", "fail")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWrongStringKeyField(): Unit = {
+    testTableApi('f0.get("fghj"), "'fail'", "fail")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testWrongStringKeyField2(): Unit = {
+    testTableApi("fail", "f0.get('fghj')", "fail")
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  def testData = {
+    val testData = new Row(8)
+    testData.setField(0, MyCaseClass(42, "Bob", booleanField = true))
+    testData.setField(1, MyCaseClass2(MyCaseClass(25, "Timo", booleanField = false)))
+    testData.setField(2, ("a", "b"))
+    testData.setField(3, new org.apache.flink.api.java.tuple.Tuple2[String, String]("a", "b"))
+    testData.setField(4, new MyPojo())
+    testData.setField(5, 13)
+    testData.setField(6, MyCaseClass2(null))
+    testData.setField(7, Tuple1(true))
+    testData
+  }
+
+  def typeInfo = {
+    new RowTypeInfo(
+      createTypeInformation[MyCaseClass],
+      createTypeInformation[MyCaseClass2],
+      createTypeInformation[(String, String)],
+      new TupleTypeInfo(Types.STRING, Types.STRING),
+      TypeExtractor.createTypeInfo(classOf[MyPojo]),
+      Types.INT,
+      createTypeInformation[MyCaseClass2],
+      createTypeInformation[Tuple1[Boolean]]
+      ).asInstanceOf[TypeInformation[Any]]
+  }
+
+}
+
+object CompositeAccessTest {
+  case class MyCaseClass(intField: Int, stringField: String, booleanField: Boolean)
+
+  case class MyCaseClass2(objectField: MyCaseClass)
+
+  class MyPojo {
+    private var myInt: Int = 0
+    private var myString: String = "Hello"
+
+    def getMyInt = myInt
+
+    def setMyInt(value: Int) = {
+      myInt = value
+    }
+
+    def getMyString = myString
+
+    def setMyString(value: String) = {
+      myString = myString
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala
new file mode 100644
index 0000000..bdc239d
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/DecimalTypeTest.scala
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.junit.Test
+
+class DecimalTypeTest extends ExpressionTestBase {
+
+  @Test
+  def testDecimalLiterals(): Unit = {
+    // implicit double
+    testAllApis(
+      11.2,
+      "11.2",
+      "11.2",
+      "11.2")
+
+    // implicit double
+    testAllApis(
+      0.7623533651719233,
+      "0.7623533651719233",
+      "0.7623533651719233",
+      "0.7623533651719233")
+
+    // explicit decimal (with precision of 19)
+    testAllApis(
+      BigDecimal("1234567891234567891"),
+      "1234567891234567891p",
+      "1234567891234567891",
+      "1234567891234567891")
+
+    // explicit decimal (high precision, not SQL compliant)
+    testTableApi(
+      BigDecimal("123456789123456789123456789"),
+      "123456789123456789123456789p",
+      "123456789123456789123456789")
+
+    // explicit decimal (high precision, not SQL compliant)
+    testTableApi(
+      BigDecimal("12.3456789123456789123456789"),
+      "12.3456789123456789123456789p",
+      "12.3456789123456789123456789")
+  }
+
+  @Test
+  def testDecimalBorders(): Unit = {
+    testAllApis(
+      Double.MaxValue,
+      Double.MaxValue.toString,
+      Double.MaxValue.toString,
+      Double.MaxValue.toString)
+
+    testAllApis(
+      Double.MinValue,
+      Double.MinValue.toString,
+      Double.MinValue.toString,
+      Double.MinValue.toString)
+
+    testAllApis(
+      Double.MinValue.cast(Types.FLOAT),
+      s"${Double.MinValue}.cast(FLOAT)",
+      s"CAST(${Double.MinValue} AS FLOAT)",
+      Float.NegativeInfinity.toString)
+
+    testAllApis(
+      Byte.MinValue.cast(Types.BYTE),
+      s"(${Byte.MinValue}).cast(BYTE)",
+      s"CAST(${Byte.MinValue} AS TINYINT)",
+      Byte.MinValue.toString)
+
+    testAllApis(
+      Byte.MinValue.cast(Types.BYTE) - 1.cast(Types.BYTE),
+      s"(${Byte.MinValue}).cast(BYTE) - (1).cast(BYTE)",
+      s"CAST(${Byte.MinValue} AS TINYINT) - CAST(1 AS TINYINT)",
+      Byte.MaxValue.toString)
+
+    testAllApis(
+      Short.MinValue.cast(Types.SHORT),
+      s"(${Short.MinValue}).cast(SHORT)",
+      s"CAST(${Short.MinValue} AS SMALLINT)",
+      Short.MinValue.toString)
+
+    testAllApis(
+      Int.MinValue.cast(Types.INT) - 1,
+      s"(${Int.MinValue}).cast(INT) - 1",
+      s"CAST(${Int.MinValue} AS INT) - 1",
+      Int.MaxValue.toString)
+
+    testAllApis(
+      Long.MinValue.cast(Types.LONG),
+      s"(${Long.MinValue}L).cast(LONG)",
+      s"CAST(${Long.MinValue} AS BIGINT)",
+      Long.MinValue.toString)
+  }
+
+  @Test
+  def testDecimalCasting(): Unit = {
+    // from String
+    testTableApi(
+      "123456789123456789123456789".cast(Types.DECIMAL),
+      "'123456789123456789123456789'.cast(DECIMAL)",
+      "123456789123456789123456789")
+
+    // from double
+    testAllApis(
+      'f3.cast(Types.DECIMAL),
+      "f3.cast(DECIMAL)",
+      "CAST(f3 AS DECIMAL)",
+      "4.2")
+
+    // to double
+    testAllApis(
+      'f0.cast(Types.DOUBLE),
+      "f0.cast(DOUBLE)",
+      "CAST(f0 AS DOUBLE)",
+      "1.2345678912345679E8")
+
+    // to int
+    testAllApis(
+      'f4.cast(Types.INT),
+      "f4.cast(INT)",
+      "CAST(f4 AS INT)",
+      "123456789")
+
+    // to long
+    testAllApis(
+      'f4.cast(Types.LONG),
+      "f4.cast(LONG)",
+      "CAST(f4 AS BIGINT)",
+      "123456789")
+
+    // to boolean (not SQL compliant)
+    testTableApi(
+      'f1.cast(Types.BOOLEAN),
+      "f1.cast(BOOL)",
+      "true")
+
+    testTableApi(
+      'f5.cast(Types.BOOLEAN),
+      "f5.cast(BOOL)",
+      "false")
+
+    testTableApi(
+      BigDecimal("123456789.123456789123456789").cast(Types.DOUBLE),
+      "(123456789.123456789123456789p).cast(DOUBLE)",
+      "1.2345678912345679E8")
+  }
+
+  @Test
+  def testDecimalArithmetic(): Unit = {
+    // implicit cast to decimal
+    testAllApis(
+      'f1 + 12,
+      "f1 + 12",
+      "f1 + 12",
+      "123456789123456789123456801")
+
+    // implicit cast to decimal
+    testAllApis(
+      Literal(12) + 'f1,
+      "12 + f1",
+      "12 + f1",
+      "123456789123456789123456801")
+
+    // implicit cast to decimal
+    testAllApis(
+      'f1 + 12.3,
+      "f1 + 12.3",
+      "f1 + 12.3",
+      "123456789123456789123456801.3")
+
+    // implicit cast to decimal
+    testAllApis(
+      Literal(12.3) + 'f1,
+      "12.3 + f1",
+      "12.3 + f1",
+      "123456789123456789123456801.3")
+
+    testAllApis(
+      'f1 + 'f1,
+      "f1 + f1",
+      "f1 + f1",
+      "246913578246913578246913578")
+
+    testAllApis(
+      'f1 - 'f1,
+      "f1 - f1",
+      "f1 - f1",
+      "0")
+
+    testAllApis(
+      'f1 * 'f1,
+      "f1 * f1",
+      "f1 * f1",
+      "15241578780673678546105778281054720515622620750190521")
+
+    testAllApis(
+      'f1 / 'f1,
+      "f1 / f1",
+      "f1 / f1",
+      "1")
+
+    testAllApis(
+      'f1 % 'f1,
+      "f1 % f1",
+      "MOD(f1, f1)",
+      "0")
+
+    testAllApis(
+      -'f0,
+      "-f0",
+      "-f0",
+      "-123456789.123456789123456789")
+  }
+
+  @Test
+  def testDecimalComparison(): Unit = {
+    testAllApis(
+      'f1 < 12,
+      "f1 < 12",
+      "f1 < 12",
+      "false")
+
+    testAllApis(
+      'f1 > 12,
+      "f1 > 12",
+      "f1 > 12",
+      "true")
+
+    testAllApis(
+      'f1 === 12,
+      "f1 === 12",
+      "f1 = 12",
+      "false")
+
+    testAllApis(
+      'f5 === 0,
+      "f5 === 0",
+      "f5 = 0",
+      "true")
+
+    testAllApis(
+      'f1 === BigDecimal("123456789123456789123456789"),
+      "f1 === 123456789123456789123456789p",
+      "f1 = CAST('123456789123456789123456789' AS DECIMAL)",
+      "true")
+
+    testAllApis(
+      'f1 !== BigDecimal("123456789123456789123456789"),
+      "f1 !== 123456789123456789123456789p",
+      "f1 <> CAST('123456789123456789123456789' AS DECIMAL)",
+      "false")
+
+    testAllApis(
+      'f4 < 'f0,
+      "f4 < f0",
+      "f4 < f0",
+      "true")
+
+    // TODO add all tests if FLINK-4070 is fixed
+    testSqlApi(
+      "12 < f1",
+      "true")
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  def testData = {
+    val testData = new Row(6)
+    testData.setField(0, BigDecimal("123456789.123456789123456789").bigDecimal)
+    testData.setField(1, BigDecimal("123456789123456789123456789").bigDecimal)
+    testData.setField(2, 42)
+    testData.setField(3, 4.2)
+    testData.setField(4, BigDecimal("123456789").bigDecimal)
+    testData.setField(5, BigDecimal("0.000").bigDecimal)
+    testData
+  }
+
+  def typeInfo = {
+    new RowTypeInfo(
+      Types.DECIMAL,
+      Types.DECIMAL,
+      Types.INT,
+      Types.DOUBLE,
+      Types.DECIMAL,
+      Types.DECIMAL).asInstanceOf[TypeInformation[Any]]
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
new file mode 100644
index 0000000..bcc53af
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.types.Row
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.junit.{Ignore, Test}
+
+/**
+  * Tests that can only be checked manually as they are non-deterministic.
+  */
+class NonDeterministicTests extends ExpressionTestBase {
+
+  @Ignore
+  @Test
+  def testCurrentDate(): Unit = {
+    testAllApis(
+      currentDate(),
+      "currentDate()",
+      "CURRENT_DATE",
+      "PLEASE CHECK MANUALLY")
+  }
+
+  @Ignore
+  @Test
+  def testCurrentTime(): Unit = {
+    testAllApis(
+      currentTime(),
+      "currentTime()",
+      "CURRENT_TIME",
+      "PLEASE CHECK MANUALLY")
+  }
+
+  @Ignore
+  @Test
+  def testCurrentTimestamp(): Unit = {
+    testAllApis(
+      currentTimestamp(),
+      "currentTimestamp()",
+      "CURRENT_TIMESTAMP",
+      "PLEASE CHECK MANUALLY")
+  }
+
+  @Ignore
+  @Test
+  def testLocalTimestamp(): Unit = {
+    testAllApis(
+      localTimestamp(),
+      "localTimestamp()",
+      "LOCALTIMESTAMP",
+      "PLEASE CHECK MANUALLY")
+  }
+
+  @Ignore
+  @Test
+  def testLocalTime(): Unit = {
+    testAllApis(
+      localTime(),
+      "localTime()",
+      "LOCALTIME",
+      "PLEASE CHECK MANUALLY")
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  override def testData: Any = new Row(0)
+
+  override def typeInfo: TypeInformation[Any] =
+    new RowTypeInfo().asInstanceOf[TypeInformation[Any]]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
new file mode 100644
index 0000000..596907b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -0,0 +1,1166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.{Types, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.expressions.utils.ExpressionTestBase
+import org.junit.Test
+
+class ScalarFunctionsTest extends ExpressionTestBase {
+
+  // ----------------------------------------------------------------------------------------------
+  // String functions
+  // ----------------------------------------------------------------------------------------------
+
+  @Test
+  def testOverlay(): Unit = {
+    testAllApis(
+      "xxxxxtest".overlay("xxxx", 6),
+      "'xxxxxtest'.overlay('xxxx', 6)",
+      "OVERLAY('xxxxxtest' PLACING 'xxxx' FROM 6)",
+      "xxxxxxxxx")
+
+    testAllApis(
+      "xxxxxtest".overlay("xxxx", 6, 2),
+      "'xxxxxtest'.overlay('xxxx', 6, 2)",
+      "OVERLAY('xxxxxtest' PLACING 'xxxx' FROM 6 FOR 2)",
+      "xxxxxxxxxst")
+  }
+
+  @Test
+  def testPosition(): Unit = {
+    testAllApis(
+      "test".position("xxxtest"),
+      "'test'.position('xxxtest')",
+      "POSITION('test' IN 'xxxtest')",
+      "4")
+
+    testAllApis(
+      "testx".position("xxxtest"),
+      "'testx'.position('xxxtest')",
+      "POSITION('testx' IN 'xxxtest')",
+      "0")
+  }
+
+  @Test
+  def testSubstring(): Unit = {
+    testAllApis(
+      'f0.substring(2),
+      "f0.substring(2)",
+      "SUBSTRING(f0, 2)",
+      "his is a test String.")
+
+    testAllApis(
+      'f0.substring(2, 5),
+      "f0.substring(2, 5)",
+      "SUBSTRING(f0, 2, 5)",
+      "his i")
+
+    testAllApis(
+      'f0.substring(1, 'f7),
+      "f0.substring(1, f7)",
+      "SUBSTRING(f0, 1, f7)",
+      "Thi")
+
+    testAllApis(
+      'f0.substring(1.cast(Types.BYTE), 'f7),
+      "f0.substring(1.cast(BYTE), f7)",
+      "SUBSTRING(f0, CAST(1 AS TINYINT), f7)",
+      "Thi")
+
+    testSqlApi(
+      "SUBSTRING(f0 FROM 2 FOR 1)",
+      "h")
+
+    testSqlApi(
+      "SUBSTRING(f0 FROM 2)",
+      "his is a test String.")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSubstring1(): Unit = {
+    // Must fail. Parameter of substring must be an Integer not a Double.
+    testTableApi("test".substring(2.0.toExpr), "FAIL", "FAIL")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidSubstring2(): Unit = {
+    // Must fail. Parameter of substring must be an Integer not a String.
+    testTableApi("test".substring("test".toExpr), "FAIL", "FAIL")
+  }
+
+  @Test
+  def testTrim(): Unit = {
+    testAllApis(
+      'f8.trim(),
+      "f8.trim()",
+      "TRIM(f8)",
+      "This is a test String.")
+
+    testAllApis(
+      'f8.trim(removeLeading = true, removeTrailing = true, " "),
+      "trim(f8)",
+      "TRIM(f8)",
+      "This is a test String.")
+
+    testAllApis(
+      'f8.trim(removeLeading = false, removeTrailing = true, " "),
+      "f8.trim(TRAILING, ' ')",
+      "TRIM(TRAILING FROM f8)",
+      " This is a test String.")
+
+    testAllApis(
+      'f0.trim(removeLeading = true, removeTrailing = true, "."),
+      "trim(BOTH, '.', f0)",
+      "TRIM(BOTH '.' FROM f0)",
+      "This is a test String")
+  }
+
+  @Test
+  def testCharLength(): Unit = {
+    testAllApis(
+      'f0.charLength(),
+      "f0.charLength()",
+      "CHAR_LENGTH(f0)",
+      "22")
+
+    testAllApis(
+      'f0.charLength(),
+      "charLength(f0)",
+      "CHARACTER_LENGTH(f0)",
+      "22")
+  }
+
+  @Test
+  def testUpperCase(): Unit = {
+    testAllApis(
+      'f0.upperCase(),
+      "f0.upperCase()",
+      "UPPER(f0)",
+      "THIS IS A TEST STRING.")
+  }
+
+  @Test
+  def testLowerCase(): Unit = {
+    testAllApis(
+      'f0.lowerCase(),
+      "f0.lowerCase()",
+      "LOWER(f0)",
+      "this is a test string.")
+  }
+
+  @Test
+  def testInitCap(): Unit = {
+    testAllApis(
+      'f0.initCap(),
+      "f0.initCap()",
+      "INITCAP(f0)",
+      "This Is A Test String.")
+  }
+
+  @Test
+  def testConcat(): Unit = {
+    testAllApis(
+      'f0 + 'f0,
+      "f0 + f0",
+      "f0||f0",
+      "This is a test String.This is a test String.")
+  }
+
+  @Test
+  def testLike(): Unit = {
+    testAllApis(
+      'f0.like("Th_s%"),
+      "f0.like('Th_s%')",
+      "f0 LIKE 'Th_s%'",
+      "true")
+
+    testAllApis(
+      'f0.like("%is a%"),
+      "f0.like('%is a%')",
+      "f0 LIKE '%is a%'",
+      "true")
+  }
+
+  @Test
+  def testNotLike(): Unit = {
+    testAllApis(
+      !'f0.like("Th_s%"),
+      "!f0.like('Th_s%')",
+      "f0 NOT LIKE 'Th_s%'",
+      "false")
+
+    testAllApis(
+      !'f0.like("%is a%"),
+      "!f0.like('%is a%')",
+      "f0 NOT LIKE '%is a%'",
+      "false")
+  }
+
+  @Test
+  def testLikeWithEscape(): Unit = {
+    testSqlApi(
+      "f23 LIKE '&%Th_s%' ESCAPE '&'",
+      "true")
+
+    testSqlApi(
+      "f23 LIKE '&%%is a%' ESCAPE '&'",
+      "true")
+
+    testSqlApi(
+      "f0 LIKE 'Th_s%' ESCAPE '&'",
+      "true")
+
+    testSqlApi(
+      "f0 LIKE '%is a%' ESCAPE '&'",
+      "true")
+  }
+
+  @Test
+  def testNotLikeWithEscape(): Unit = {
+    testSqlApi(
+      "f23 NOT LIKE '&%Th_s%' ESCAPE '&'",
+      "false")
+
+    testSqlApi(
+      "f23 NOT LIKE '&%%is a%' ESCAPE '&'",
+      "false")
+
+    testSqlApi(
+      "f0 NOT LIKE 'Th_s%' ESCAPE '&'",
+      "false")
+
+    testSqlApi(
+      "f0 NOT LIKE '%is a%' ESCAPE '&'",
+      "false")
+  }
+
+  @Test
+  def testSimilar(): Unit = {
+    testAllApis(
+      'f0.similar("_*"),
+      "f0.similar('_*')",
+      "f0 SIMILAR TO '_*'",
+      "true")
+
+    testAllApis(
+      'f0.similar("This (is)? a (test)+ Strin_*"),
+      "f0.similar('This (is)? a (test)+ Strin_*')",
+      "f0 SIMILAR TO 'This (is)? a (test)+ Strin_*'",
+      "true")
+  }
+
+  @Test
+  def testNotSimilar(): Unit = {
+    testAllApis(
+      !'f0.similar("_*"),
+      "!f0.similar('_*')",
+      "f0 NOT SIMILAR TO '_*'",
+      "false")
+
+    testAllApis(
+      !'f0.similar("This (is)? a (test)+ Strin_*"),
+      "!f0.similar('This (is)? a (test)+ Strin_*')",
+      "f0 NOT SIMILAR TO 'This (is)? a (test)+ Strin_*'",
+      "false")
+  }
+
+  @Test
+  def testSimilarWithEscape(): Unit = {
+    testSqlApi(
+      "f24 SIMILAR TO '&*&__*' ESCAPE '&'",
+      "true")
+
+    testSqlApi(
+      "f0 SIMILAR TO '_*' ESCAPE '&'",
+      "true")
+
+    testSqlApi(
+      "f24 SIMILAR TO '&*&_This (is)? a (test)+ Strin_*' ESCAPE '&'",
+      "true")
+
+    testSqlApi(
+      "f0 SIMILAR TO 'This (is)? a (test)+ Strin_*' ESCAPE '&'",
+      "true")
+  }
+
+  @Test
+  def testNotSimilarWithEscape(): Unit = {
+    testSqlApi(
+      "f24 NOT SIMILAR TO '&*&__*' ESCAPE '&'",
+      "false")
+
+    testSqlApi(
+      "f0 NOT SIMILAR TO '_*' ESCAPE '&'",
+      "false")
+
+    testSqlApi(
+      "f24 NOT SIMILAR TO '&*&_This (is)? a (test)+ Strin_*' ESCAPE '&'",
+      "false")
+
+    testSqlApi(
+      "f0 NOT SIMILAR TO 'This (is)? a (test)+ Strin_*' ESCAPE '&'",
+      "false")
+  }
+
+  // ----------------------------------------------------------------------------------------------
+  // Math functions
+  // ----------------------------------------------------------------------------------------------
+
+  @Test
+  def testMod(): Unit = {
+    testAllApis(
+      'f4.mod('f7),
+      "f4.mod(f7)",
+      "MOD(f4, f7)",
+      "2")
+
+    testAllApis(
+      'f4.mod(3),
+      "mod(f4, 3)",
+      "MOD(f4, 3)",
+      "2")
+
+    testAllApis(
+      'f4 % 3,
+      "mod(44, 3)",
+      "MOD(44, 3)",
+      "2")
+
+  }
+
+  @Test
+  def testExp(): Unit = {
+    testAllApis(
+      'f2.exp(),
+      "f2.exp()",
+      "EXP(f2)",
+      math.exp(42.toByte).toString)
+
+    testAllApis(
+      'f3.exp(),
+      "f3.exp()",
+      "EXP(f3)",
+      math.exp(43.toShort).toString)
+
+    testAllApis(
+      'f4.exp(),
+      "f4.exp()",
+      "EXP(f4)",
+      math.exp(44.toLong).toString)
+
+    testAllApis(
+      'f5.exp(),
+      "f5.exp()",
+      "EXP(f5)",
+      math.exp(4.5.toFloat).toString)
+
+    testAllApis(
+      'f6.exp(),
+      "f6.exp()",
+      "EXP(f6)",
+      math.exp(4.6).toString)
+
+    testAllApis(
+      'f7.exp(),
+      "exp(3)",
+      "EXP(3)",
+      math.exp(3).toString)
+
+    testAllApis(
+      'f7.exp(),
+      "exp(3)",
+      "EXP(3)",
+      math.exp(3).toString)
+  }
+
+  @Test
+  def testLog10(): Unit = {
+    testAllApis(
+      'f2.log10(),
+      "f2.log10()",
+      "LOG10(f2)",
+      math.log10(42.toByte).toString)
+
+    testAllApis(
+      'f3.log10(),
+      "f3.log10()",
+      "LOG10(f3)",
+      math.log10(43.toShort).toString)
+
+    testAllApis(
+      'f4.log10(),
+      "f4.log10()",
+      "LOG10(f4)",
+      math.log10(44.toLong).toString)
+
+    testAllApis(
+      'f5.log10(),
+      "f5.log10()",
+      "LOG10(f5)",
+      math.log10(4.5.toFloat).toString)
+
+    testAllApis(
+      'f6.log10(),
+      "f6.log10()",
+      "LOG10(f6)",
+      math.log10(4.6).toString)
+  }
+
+  @Test
+  def testPower(): Unit = {
+    // f7: int , f4: long, f6: double
+    testAllApis(
+      'f2.power('f7),
+      "f2.power(f7)",
+      "POWER(f2, f7)",
+      math.pow(42.toByte, 3).toString)
+
+    testAllApis(
+      'f3.power('f6),
+      "f3.power(f6)",
+      "POWER(f3, f6)",
+      math.pow(43.toShort, 4.6D).toString)
+
+    testAllApis(
+      'f4.power('f5),
+      "f4.power(f5)",
+      "POWER(f4, f5)",
+      math.pow(44.toLong, 4.5.toFloat).toString)
+
+    testAllApis(
+      'f4.power('f5),
+      "f4.power(f5)",
+      "POWER(f4, f5)",
+      math.pow(44.toLong, 4.5.toFloat).toString)
+
+    // f5: float
+    testAllApis('f5.power('f5),
+      "f5.power(f5)",
+      "power(f5, f5)",
+      math.pow(4.5F, 4.5F).toString)
+
+    testAllApis('f5.power('f6),
+      "f5.power(f6)",
+      "power(f5, f6)",
+      math.pow(4.5F, 4.6D).toString)
+
+    testAllApis('f5.power('f7),
+      "f5.power(f7)",
+      "power(f5, f7)",
+      math.pow(4.5F, 3).toString)
+
+    testAllApis('f5.power('f4),
+      "f5.power(f4)",
+      "power(f5, f4)",
+      math.pow(4.5F, 44L).toString)
+
+    // f22: bigDecimal
+    // TODO delete casting in SQL when CALCITE-1467 is fixed
+    testAllApis(
+      'f22.cast(Types.DOUBLE).power('f5),
+      "f22.cast(DOUBLE).power(f5)",
+      "power(CAST(f22 AS DOUBLE), f5)",
+      math.pow(2, 4.5F).toString)
+
+    testAllApis(
+      'f22.cast(Types.DOUBLE).power('f6),
+      "f22.cast(DOUBLE).power(f6)",
+      "power(CAST(f22 AS DOUBLE), f6)",
+      math.pow(2, 4.6D).toString)
+
+    testAllApis(
+      'f22.cast(Types.DOUBLE).power('f7),
+      "f22.cast(DOUBLE).power(f7)",
+      "power(CAST(f22 AS DOUBLE), f7)",
+      math.pow(2, 3).toString)
+
+    testAllApis(
+      'f22.cast(Types.DOUBLE).power('f4),
+      "f22.cast(DOUBLE).power(f4)",
+      "power(CAST(f22 AS DOUBLE), f4)",
+      math.pow(2, 44L).toString)
+
+    testAllApis(
+      'f6.power('f22.cast(Types.DOUBLE)),
+      "f6.power(f22.cast(DOUBLE))",
+      "power(f6, f22)",
+      math.pow(4.6D, 2).toString)
+  }
+
+  @Test
+  def testSqrt(): Unit = {
+    testAllApis(
+      'f6.sqrt(),
+      "f6.sqrt",
+      "SQRT(f6)",
+      math.sqrt(4.6D).toString)
+
+    testAllApis(
+      'f7.sqrt(),
+      "f7.sqrt",
+      "SQRT(f7)",
+      math.sqrt(3).toString)
+
+    testAllApis(
+      'f4.sqrt(),
+      "f4.sqrt",
+      "SQRT(f4)",
+      math.sqrt(44L).toString)
+
+    testAllApis(
+      'f22.cast(Types.DOUBLE).sqrt(),
+      "f22.cast(DOUBLE).sqrt",
+      "SQRT(CAST(f22 AS DOUBLE))",
+      math.sqrt(2.0).toString)
+
+    testAllApis(
+      'f5.sqrt(),
+      "f5.sqrt",
+      "SQRT(f5)",
+      math.pow(4.5F, 0.5).toString)
+
+    testAllApis(
+      25.sqrt(),
+      "25.sqrt()",
+      "SQRT(25)",
+      "5.0")
+
+    testAllApis(
+      2.2.sqrt(),
+      "2.2.sqrt()",
+      "POWER(CAST(2.2 AS DOUBLE), CAST(0.5 AS DOUBLE))", // TODO fix FLINK-4621
+      math.sqrt(2.2).toString)
+  }
+
+  @Test
+  def testLn(): Unit = {
+    testAllApis(
+      'f2.ln(),
+      "f2.ln()",
+      "LN(f2)",
+      math.log(42.toByte).toString)
+
+    testAllApis(
+      'f3.ln(),
+      "f3.ln()",
+      "LN(f3)",
+      math.log(43.toShort).toString)
+
+    testAllApis(
+      'f4.ln(),
+      "f4.ln()",
+      "LN(f4)",
+      math.log(44.toLong).toString)
+
+    testAllApis(
+      'f5.ln(),
+      "f5.ln()",
+      "LN(f5)",
+      math.log(4.5.toFloat).toString)
+
+    testAllApis(
+      'f6.ln(),
+      "f6.ln()",
+      "LN(f6)",
+      math.log(4.6).toString)
+  }
+
+  @Test
+  def testAbs(): Unit = {
+    testAllApis(
+      'f2.abs(),
+      "f2.abs()",
+      "ABS(f2)",
+      "42")
+
+    testAllApis(
+      'f3.abs(),
+      "f3.abs()",
+      "ABS(f3)",
+      "43")
+
+    testAllApis(
+      'f4.abs(),
+      "f4.abs()",
+      "ABS(f4)",
+      "44")
+
+    testAllApis(
+      'f5.abs(),
+      "f5.abs()",
+      "ABS(f5)",
+      "4.5")
+
+    testAllApis(
+      'f6.abs(),
+      "f6.abs()",
+      "ABS(f6)",
+      "4.6")
+
+    testAllApis(
+      'f9.abs(),
+      "f9.abs()",
+      "ABS(f9)",
+      "42")
+
+    testAllApis(
+      'f10.abs(),
+      "f10.abs()",
+      "ABS(f10)",
+      "43")
+
+    testAllApis(
+      'f11.abs(),
+      "f11.abs()",
+      "ABS(f11)",
+      "44")
+
+    testAllApis(
+      'f12.abs(),
+      "f12.abs()",
+      "ABS(f12)",
+      "4.5")
+
+    testAllApis(
+      'f13.abs(),
+      "f13.abs()",
+      "ABS(f13)",
+      "4.6")
+
+    testAllApis(
+      'f15.abs(),
+      "f15.abs()",
+      "ABS(f15)",
+      "1231.1231231321321321111")
+  }
+
+  @Test
+  def testArithmeticFloorCeil(): Unit = {
+    testAllApis(
+      'f5.floor(),
+      "f5.floor()",
+      "FLOOR(f5)",
+      "4.0")
+
+    testAllApis(
+     'f5.ceil(),
+      "f5.ceil()",
+      "CEIL(f5)",
+      "5.0")
+
+    testAllApis(
+      'f3.floor(),
+      "f3.floor()",
+      "FLOOR(f3)",
+      "43")
+
+    testAllApis(
+      'f3.ceil(),
+      "f3.ceil()",
+      "CEIL(f3)",
+      "43")
+
+    testAllApis(
+      'f15.floor(),
+      "f15.floor()",
+      "FLOOR(f15)",
+      "-1232")
+
+    testAllApis(
+      'f15.ceil(),
+      "f15.ceil()",
+      "CEIL(f15)",
+      "-1231")
+  }
+
+  // ----------------------------------------------------------------------------------------------
+  // Temporal functions
+  // ----------------------------------------------------------------------------------------------
+
+  @Test
+  def testExtract(): Unit = {
+    testAllApis(
+      'f16.extract(TimeIntervalUnit.YEAR),
+      "f16.extract(YEAR)",
+      "EXTRACT(YEAR FROM f16)",
+      "1996")
+
+    testAllApis(
+      'f16.extract(TimeIntervalUnit.MONTH),
+      "extract(f16, MONTH)",
+      "EXTRACT(MONTH FROM f16)",
+      "11")
+
+    testAllApis(
+      'f16.extract(TimeIntervalUnit.DAY),
+      "f16.extract(DAY)",
+      "EXTRACT(DAY FROM f16)",
+      "10")
+
+    testAllApis(
+      'f18.extract(TimeIntervalUnit.YEAR),
+      "f18.extract(YEAR)",
+      "EXTRACT(YEAR FROM f18)",
+      "1996")
+
+    testAllApis(
+      'f18.extract(TimeIntervalUnit.MONTH),
+      "f18.extract(MONTH)",
+      "EXTRACT(MONTH FROM f18)",
+      "11")
+
+    testAllApis(
+      'f18.extract(TimeIntervalUnit.DAY),
+      "f18.extract(DAY)",
+      "EXTRACT(DAY FROM f18)",
+      "10")
+
+    testAllApis(
+      'f18.extract(TimeIntervalUnit.HOUR),
+      "f18.extract(HOUR)",
+      "EXTRACT(HOUR FROM f18)",
+      "6")
+
+    testAllApis(
+      'f17.extract(TimeIntervalUnit.HOUR),
+      "f17.extract(HOUR)",
+      "EXTRACT(HOUR FROM f17)",
+      "6")
+
+    testAllApis(
+      'f18.extract(TimeIntervalUnit.MINUTE),
+      "f18.extract(MINUTE)",
+      "EXTRACT(MINUTE FROM f18)",
+      "55")
+
+    testAllApis(
+      'f17.extract(TimeIntervalUnit.MINUTE),
+      "f17.extract(MINUTE)",
+      "EXTRACT(MINUTE FROM f17)",
+      "55")
+
+    testAllApis(
+      'f18.extract(TimeIntervalUnit.SECOND),
+      "f18.extract(SECOND)",
+      "EXTRACT(SECOND FROM f18)",
+      "44")
+
+    testAllApis(
+      'f17.extract(TimeIntervalUnit.SECOND),
+      "f17.extract(SECOND)",
+      "EXTRACT(SECOND FROM f17)",
+      "44")
+
+    testAllApis(
+      'f19.extract(TimeIntervalUnit.DAY),
+      "f19.extract(DAY)",
+      "EXTRACT(DAY FROM f19)",
+      "16979")
+
+    testAllApis(
+      'f19.extract(TimeIntervalUnit.HOUR),
+      "f19.extract(HOUR)",
+      "EXTRACT(HOUR FROM f19)",
+      "7")
+
+    testAllApis(
+      'f19.extract(TimeIntervalUnit.MINUTE),
+      "f19.extract(MINUTE)",
+      "EXTRACT(MINUTE FROM f19)",
+      "23")
+
+    testAllApis(
+      'f19.extract(TimeIntervalUnit.SECOND),
+      "f19.extract(SECOND)",
+      "EXTRACT(SECOND FROM f19)",
+      "33")
+
+    testAllApis(
+      'f20.extract(TimeIntervalUnit.MONTH),
+      "f20.extract(MONTH)",
+      "EXTRACT(MONTH FROM f20)",
+      "1")
+
+    testAllApis(
+      'f20.extract(TimeIntervalUnit.YEAR),
+      "f20.extract(YEAR)",
+      "EXTRACT(YEAR FROM f20)",
+      "2")
+  }
+
+  @Test
+  def testTemporalFloor(): Unit = {
+    testAllApis(
+      'f18.floor(TimeIntervalUnit.YEAR),
+      "f18.floor(YEAR)",
+      "FLOOR(f18 TO YEAR)",
+      "1996-01-01 00:00:00.0")
+
+    testAllApis(
+      'f18.floor(TimeIntervalUnit.MONTH),
+      "f18.floor(MONTH)",
+      "FLOOR(f18 TO MONTH)",
+      "1996-11-01 00:00:00.0")
+
+    testAllApis(
+      'f18.floor(TimeIntervalUnit.DAY),
+      "f18.floor(DAY)",
+      "FLOOR(f18 TO DAY)",
+      "1996-11-10 00:00:00.0")
+
+    testAllApis(
+      'f18.floor(TimeIntervalUnit.MINUTE),
+      "f18.floor(MINUTE)",
+      "FLOOR(f18 TO MINUTE)",
+      "1996-11-10 06:55:00.0")
+
+    testAllApis(
+      'f18.floor(TimeIntervalUnit.SECOND),
+      "f18.floor(SECOND)",
+      "FLOOR(f18 TO SECOND)",
+      "1996-11-10 06:55:44.0")
+
+    testAllApis(
+      'f17.floor(TimeIntervalUnit.HOUR),
+      "f17.floor(HOUR)",
+      "FLOOR(f17 TO HOUR)",
+      "06:00:00")
+
+    testAllApis(
+      'f17.floor(TimeIntervalUnit.MINUTE),
+      "f17.floor(MINUTE)",
+      "FLOOR(f17 TO MINUTE)",
+      "06:55:00")
+
+    testAllApis(
+      'f17.floor(TimeIntervalUnit.SECOND),
+      "f17.floor(SECOND)",
+      "FLOOR(f17 TO SECOND)",
+      "06:55:44")
+
+    testAllApis(
+      'f16.floor(TimeIntervalUnit.YEAR),
+      "f16.floor(YEAR)",
+      "FLOOR(f16 TO YEAR)",
+      "1996-01-01")
+
+    testAllApis(
+      'f16.floor(TimeIntervalUnit.MONTH),
+      "f16.floor(MONTH)",
+      "FLOOR(f16 TO MONTH)",
+      "1996-11-01")
+
+    testAllApis(
+      'f18.ceil(TimeIntervalUnit.YEAR),
+      "f18.ceil(YEAR)",
+      "CEIL(f18 TO YEAR)",
+      "1997-01-01 00:00:00.0")
+
+    testAllApis(
+      'f18.ceil(TimeIntervalUnit.MONTH),
+      "f18.ceil(MONTH)",
+      "CEIL(f18 TO MONTH)",
+      "1996-12-01 00:00:00.0")
+
+    testAllApis(
+      'f18.ceil(TimeIntervalUnit.DAY),
+      "f18.ceil(DAY)",
+      "CEIL(f18 TO DAY)",
+      "1996-11-11 00:00:00.0")
+
+    testAllApis(
+      'f18.ceil(TimeIntervalUnit.MINUTE),
+      "f18.ceil(MINUTE)",
+      "CEIL(f18 TO MINUTE)",
+      "1996-11-10 06:56:00.0")
+
+    testAllApis(
+      'f18.ceil(TimeIntervalUnit.SECOND),
+      "f18.ceil(SECOND)",
+      "CEIL(f18 TO SECOND)",
+      "1996-11-10 06:55:45.0")
+
+    testAllApis(
+      'f17.ceil(TimeIntervalUnit.HOUR),
+      "f17.ceil(HOUR)",
+      "CEIL(f17 TO HOUR)",
+      "07:00:00")
+
+    testAllApis(
+      'f17.ceil(TimeIntervalUnit.MINUTE),
+      "f17.ceil(MINUTE)",
+      "CEIL(f17 TO MINUTE)",
+      "06:56:00")
+
+    testAllApis(
+      'f17.ceil(TimeIntervalUnit.SECOND),
+      "f17.ceil(SECOND)",
+      "CEIL(f17 TO SECOND)",
+      "06:55:44")
+
+    testAllApis(
+      'f16.ceil(TimeIntervalUnit.YEAR),
+      "f16.ceil(YEAR)",
+      "CEIL(f16 TO YEAR)",
+      "1996-01-01")
+
+    testAllApis(
+      'f16.ceil(TimeIntervalUnit.MONTH),
+      "f16.ceil(MONTH)",
+      "CEIL(f16 TO MONTH)",
+      "1996-11-01")
+  }
+
+  @Test
+  def testCurrentTimePoint(): Unit = {
+
+    // current time points are non-deterministic
+    // we just test the format of the output
+    // manual test can be found in NonDeterministicTests
+
+    testAllApis(
+      currentDate().cast(Types.STRING).charLength() >= 5,
+      "currentDate().cast(STRING).charLength() >= 5",
+      "CHAR_LENGTH(CAST(CURRENT_DATE AS VARCHAR)) >= 5",
+      "true")
+
+    testAllApis(
+      currentTime().cast(Types.STRING).charLength() >= 5,
+      "currentTime().cast(STRING).charLength() >= 5",
+      "CHAR_LENGTH(CAST(CURRENT_TIME AS VARCHAR)) >= 5",
+      "true")
+
+    testAllApis(
+      currentTimestamp().cast(Types.STRING).charLength() >= 12,
+      "currentTimestamp().cast(STRING).charLength() >= 12",
+      "CHAR_LENGTH(CAST(CURRENT_TIMESTAMP AS VARCHAR)) >= 12",
+      "true")
+
+    testAllApis(
+      localTimestamp().cast(Types.STRING).charLength() >= 12,
+      "localTimestamp().cast(STRING).charLength() >= 12",
+      "CHAR_LENGTH(CAST(LOCALTIMESTAMP AS VARCHAR)) >= 12",
+      "true")
+
+    testAllApis(
+      localTime().cast(Types.STRING).charLength() >= 5,
+      "localTime().cast(STRING).charLength() >= 5",
+      "CHAR_LENGTH(CAST(LOCALTIME AS VARCHAR)) >= 5",
+      "true")
+
+    // comparisons are deterministic
+    testAllApis(
+      localTimestamp() === localTimestamp(),
+      "localTimestamp() === localTimestamp()",
+      "LOCALTIMESTAMP = LOCALTIMESTAMP",
+      "true")
+  }
+
+  @Test
+  def testOverlaps(): Unit = {
+    testAllApis(
+      temporalOverlaps("2:55:00".toTime, 1.hour, "3:30:00".toTime, 2.hours),
+      "temporalOverlaps('2:55:00'.toTime, 1.hour, '3:30:00'.toTime, 2.hours)",
+      "(TIME '2:55:00', INTERVAL '1' HOUR) OVERLAPS (TIME '3:30:00', INTERVAL '2' HOUR)",
+      "true")
+
+    testAllApis(
+      temporalOverlaps("9:00:00".toTime, "9:30:00".toTime, "9:29:00".toTime, "9:31:00".toTime),
+      "temporalOverlaps('9:00:00'.toTime, '9:30:00'.toTime, '9:29:00'.toTime, '9:31:00'.toTime)",
+      "(TIME '9:00:00', TIME '9:30:00') OVERLAPS (TIME '9:29:00', TIME '9:31:00')",
+      "true")
+
+    testAllApis(
+      temporalOverlaps("9:00:00".toTime, "10:00:00".toTime, "10:15:00".toTime, 3.hours),
+      "temporalOverlaps('9:00:00'.toTime, '10:00:00'.toTime, '10:15:00'.toTime, 3.hours)",
+      "(TIME '9:00:00', TIME '10:00:00') OVERLAPS (TIME '10:15:00', INTERVAL '3' HOUR)",
+      "false")
+
+    testAllApis(
+      temporalOverlaps("2011-03-10".toDate, 10.days, "2011-03-19".toDate, 10.days),
+      "temporalOverlaps('2011-03-10'.toDate, 10.days, '2011-03-19'.toDate, 10.days)",
+      "(DATE '2011-03-10', INTERVAL '10' DAY) OVERLAPS (DATE '2011-03-19', INTERVAL '10' DAY)",
+      "true")
+
+    testAllApis(
+      temporalOverlaps("2011-03-10 05:02:02".toTimestamp, 0.milli,
+        "2011-03-10 05:02:02".toTimestamp, "2011-03-10 05:02:01".toTimestamp),
+      "temporalOverlaps('2011-03-10 05:02:02'.toTimestamp, 0.milli, " +
+        "'2011-03-10 05:02:02'.toTimestamp, '2011-03-10 05:02:01'.toTimestamp)",
+      "(TIMESTAMP '2011-03-10 05:02:02', INTERVAL '0' SECOND) OVERLAPS " +
+        "(TIMESTAMP '2011-03-10 05:02:02', TIMESTAMP '2011-03-10 05:02:01')",
+      "false")
+
+    // TODO enable once CALCITE-1435 is fixed
+    // comparison of timestamps based on milliseconds is buggy
+    //testAllApis(
+    //  temporalOverlaps("2011-03-10 02:02:02.001".toTimestamp, 0.milli,
+    //    "2011-03-10 02:02:02.002".toTimestamp, "2011-03-10 02:02:02.002".toTimestamp),
+    //  "temporalOverlaps('2011-03-10 02:02:02.001'.toTimestamp, 0.milli, " +
+    //    "'2011-03-10 02:02:02.002'.toTimestamp, '2011-03-10 02:02:02.002'.toTimestamp)",
+    //  "(TIMESTAMP '2011-03-10 02:02:02.001', INTERVAL '0' SECOND) OVERLAPS " +
+    //    "(TIMESTAMP '2011-03-10 02:02:02.002', TIMESTAMP '2011-03-10 02:02:02.002')",
+    //  "false")
+  }
+
+  @Test
+  def testQuarter(): Unit = {
+    testAllApis(
+      "1997-01-27".toDate.quarter(),
+      "'1997-01-27'.toDate.quarter()",
+      "QUARTER(DATE '1997-01-27')",
+      "1")
+
+    testAllApis(
+      "1997-04-27".toDate.quarter(),
+      "'1997-04-27'.toDate.quarter()",
+      "QUARTER(DATE '1997-04-27')",
+      "2")
+
+    testAllApis(
+      "1997-12-31".toDate.quarter(),
+      "'1997-12-31'.toDate.quarter()",
+      "QUARTER(DATE '1997-12-31')",
+      "4")
+  }
+
+  // ----------------------------------------------------------------------------------------------
+  // Other functions
+  // ----------------------------------------------------------------------------------------------
+
+  @Test
+  def testIsTrueIsFalse(): Unit = {
+    testAllApis(
+      'f1.isTrue,
+      "f1.isTrue",
+      "f1 IS TRUE",
+      "true")
+
+    testAllApis(
+      'f21.isTrue,
+      "f21.isTrue",
+      "f21 IS TRUE",
+      "false")
+
+    testAllApis(
+      false.isFalse,
+      "false.isFalse",
+      "FALSE IS FALSE",
+      "true")
+
+    testAllApis(
+      'f21.isFalse,
+      "f21.isFalse",
+      "f21 IS FALSE",
+      "false")
+
+    testAllApis(
+      'f1.isNotTrue,
+      "f1.isNotTrue",
+      "f1 IS NOT TRUE",
+      "false")
+
+    testAllApis(
+      'f21.isNotTrue,
+      "f21.isNotTrue",
+      "f21 IS NOT TRUE",
+      "true")
+
+    testAllApis(
+      false.isNotFalse,
+      "false.isNotFalse",
+      "FALSE IS NOT FALSE",
+      "false")
+
+    testAllApis(
+      'f21.isNotFalse,
+      "f21.isNotFalse",
+      "f21 IS NOT FALSE",
+      "true")
+  }
+
+  // ----------------------------------------------------------------------------------------------
+
+  def testData = {
+    val testData = new Row(25)
+    testData.setField(0, "This is a test String.")
+    testData.setField(1, true)
+    testData.setField(2, 42.toByte)
+    testData.setField(3, 43.toShort)
+    testData.setField(4, 44.toLong)
+    testData.setField(5, 4.5.toFloat)
+    testData.setField(6, 4.6)
+    testData.setField(7, 3)
+    testData.setField(8, " This is a test String. ")
+    testData.setField(9, -42.toByte)
+    testData.setField(10, -43.toShort)
+    testData.setField(11, -44.toLong)
+    testData.setField(12, -4.5.toFloat)
+    testData.setField(13, -4.6)
+    testData.setField(14, -3)
+    testData.setField(15, BigDecimal("-1231.1231231321321321111").bigDecimal)
+    testData.setField(16, Date.valueOf("1996-11-10"))
+    testData.setField(17, Time.valueOf("06:55:44"))
+    testData.setField(18, Timestamp.valueOf("1996-11-10 06:55:44.333"))
+    testData.setField(19, 1467012213000L) // +16979 07:23:33.000
+    testData.setField(20, 25) // +2-01
+    testData.setField(21, null)
+    testData.setField(22, BigDecimal("2").bigDecimal)
+    testData.setField(23, "%This is a test String.")
+    testData.setField(24, "*_This is a test String.")
+    testData
+  }
+
+  def typeInfo = {
+    new RowTypeInfo(
+      Types.STRING,
+      Types.BOOLEAN,
+      Types.BYTE,
+      Types.SHORT,
+      Types.LONG,
+      Types.FLOAT,
+      Types.DOUBLE,
+      Types.INT,
+      Types.STRING,
+      Types.BYTE,
+      Types.SHORT,
+      Types.LONG,
+      Types.FLOAT,
+      Types.DOUBLE,
+      Types.INT,
+      Types.DECIMAL,
+      Types.DATE,
+      Types.TIME,
+      Types.TIMESTAMP,
+      Types.INTERVAL_MILLIS,
+      Types.INTERVAL_MONTHS,
+      Types.BOOLEAN,
+      Types.DECIMAL,
+      Types.STRING,
+      Types.STRING).asInstanceOf[TypeInformation[Any]]
+
+  }
+}