You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:40 UTC

[31/44] flink git commit: [FLINK-6617] [table] Restructuring of tests

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/batch/table/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/batch/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/batch/table/ExternalCatalogTest.scala
deleted file mode 100644
index fab6c08..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/batch/table/ExternalCatalogTest.scala
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.batch.table
-
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.catalog.utils.ExternalCatalogTestBase
-import org.apache.flink.table.utils.CommonTestData
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
-  * Test for external catalog query plan.
-  */
-class ExternalCatalogTest extends ExternalCatalogTestBase {
-
-  @Test
-  def test(): Unit = {
-    val util = batchTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
-    val table1 = tEnv.scan("test", "db1", "tb1")
-    val table2 = tEnv.scan("test", "db2", "tb2")
-    val result = table2
-      .select('d * 2, 'e, 'g.upperCase())
-      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
-
-    val expected = binaryNode(
-      "DataSetUnion",
-      unaryNode(
-        "DataSetCalc",
-        sourceBatchTableNode(table2Path, table2ProjectedFields),
-        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
-      ),
-      unaryNode(
-        "DataSetCalc",
-        sourceBatchTableNode(table1Path, table1ProjectedFields),
-        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
-      ),
-      term("union", "_c0", "e", "_c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testTopLevelTable(): Unit = {
-    val util = batchTestUtil()
-    val tEnv = util.tableEnv
-
-    tEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
-    val table1 = tEnv.scan("test", "tb1")
-    val table2 = tEnv.scan("test", "db2", "tb2")
-    val result = table2
-      .select('d * 2, 'e, 'g.upperCase())
-      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
-
-    val expected = binaryNode(
-      "DataSetUnion",
-      unaryNode(
-        "DataSetCalc",
-        sourceBatchTableNode(table2Path, table2ProjectedFields),
-        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
-      ),
-      unaryNode(
-        "DataSetCalc",
-        sourceBatchTableNode(table1TopLevelPath, table1ProjectedFields),
-        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
-      ),
-      term("union", "_c0", "e", "_c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/sql/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/sql/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/sql/ExternalCatalogTest.scala
deleted file mode 100644
index 786c4a1..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/sql/ExternalCatalogTest.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.stream.sql
-
-import org.apache.flink.table.catalog.utils.ExternalCatalogTestBase
-import org.apache.flink.table.utils.CommonTestData
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
-  * Test for external catalog query plan.
-  */
-class ExternalCatalogTest extends ExternalCatalogTestBase {
-
-  @Test
-  def test(): Unit = {
-    val util = streamTestUtil()
-
-    util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
-    val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
-        "(SELECT a * 2, b, c FROM test.db1.tb1)"
-
-    val expected = binaryNode(
-      "DataStreamUnion",
-      unaryNode(
-        "DataStreamCalc",
-        sourceStreamTableNode(table2Path, table2ProjectedFields),
-        term("select", "*(d, 2) AS EXPR$0", "e", "g"),
-        term("where", "<(d, 3)")),
-      unaryNode(
-        "DataStreamCalc",
-        sourceStreamTableNode(table1Path, table1ProjectedFields),
-        term("select", "*(a, 2) AS EXPR$0", "b", "c")
-      ),
-      term("union all", "EXPR$0", "e", "g"))
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/table/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/table/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/table/ExternalCatalogTest.scala
deleted file mode 100644
index 79d3f99..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/stream/table/ExternalCatalogTest.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.catalog.stream.table
-
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.catalog.utils.ExternalCatalogTestBase
-import org.apache.flink.table.utils.CommonTestData
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-/**
-  * Test for external catalog query plan.
-  */
-class ExternalCatalogTest extends ExternalCatalogTestBase {
-
-  @Test
-  def testStreamTableApi(): Unit = {
-    val util = streamTestUtil()
-    val tEnv = util.tableEnv
-
-    util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
-
-    val table1 = tEnv.scan("test", "db1", "tb1")
-    val table2 = tEnv.scan("test", "db2", "tb2")
-
-    val result = table2.where("d < 3")
-      .select('d * 2, 'e, 'g.upperCase())
-      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
-
-    val expected = binaryNode(
-      "DataStreamUnion",
-      unaryNode(
-        "DataStreamCalc",
-        sourceStreamTableNode(table2Path, table2ProjectedFields),
-        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2"),
-        term("where", "<(d, 3)")
-      ),
-      unaryNode(
-        "DataStreamCalc",
-        sourceStreamTableNode(table1Path, table1ProjectedFields),
-        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
-      ),
-      term("union all", "_c0", "e", "_c2")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/utils/ExternalCatalogTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/utils/ExternalCatalogTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/utils/ExternalCatalogTestBase.scala
deleted file mode 100644
index 19fbd36..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/catalog/utils/ExternalCatalogTestBase.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.catalog.utils
-
-import org.apache.flink.table.utils.TableTestBase
-
-class ExternalCatalogTestBase extends TableTestBase {
-  protected val table1Path: Array[String] = Array("test", "db1", "tb1")
-  protected val table1TopLevelPath: Array[String] = Array("test", "tb1")
-  protected val table1ProjectedFields: Array[String] = Array("a", "b", "c")
-  protected val table2Path: Array[String] = Array("test", "db2", "tb2")
-  protected val table2ProjectedFields: Array[String] = Array("d", "e", "g")
-
-  def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
-    s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
-      s"fields=[${fields.mkString(", ")}])"
-  }
-
-  def sourceStreamTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
-    s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
-      s"fields=[${fields.mkString(", ")}])"
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
index 558e5b3..bd1c199 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/CompositeAccessTest.scala
@@ -19,10 +19,10 @@
 package org.apache.flink.table.expressions
 
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.CompositeAccessTestBase
+import org.apache.flink.table.expressions.utils.CompositeTypeTestBase
 import org.junit.Test
 
-class CompositeAccessTest extends CompositeAccessTestBase {
+class CompositeAccessTest extends CompositeTypeTestBase {
 
   @Test
   def testGetField(): Unit = {
@@ -120,7 +120,6 @@ class CompositeAccessTest extends CompositeAccessTestBase {
 
     testTableApi('f5.flatten(), "f5.flatten()", "13")
   }
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
index 7b09733..b2b8016 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
@@ -31,5 +31,4 @@ class MapTypeTest extends MapTypeTestBase {
     testSqlApi("f3[1]", "null")
     testSqlApi("f3[12]", "a")
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
index bcc53af..d3b606b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/NonDeterministicTests.scala
@@ -82,7 +82,7 @@ class NonDeterministicTests extends ExpressionTestBase {
 
   // ----------------------------------------------------------------------------------------------
 
-  override def testData: Any = new Row(0)
+  override def testData: Row = new Row(0)
 
   override def typeInfo: TypeInformation[Any] =
     new RowTypeInfo().asInstanceOf[TypeInformation[Any]]

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index ef021b7..02745b5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -20,10 +20,10 @@ package org.apache.flink.table.expressions
 
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.ScalarFunctionsTestBase
+import org.apache.flink.table.expressions.utils.ScalarTypesTestBase
 import org.junit.Test
 
-class ScalarFunctionsTest extends ScalarFunctionsTestBase {
+class ScalarFunctionsTest extends ScalarTypesTestBase {
 
   // ----------------------------------------------------------------------------------------------
   // String functions
@@ -347,7 +347,6 @@ class ScalarFunctionsTest extends ScalarFunctionsTestBase {
       "mod(44, 3)",
       "MOD(44, 3)",
       "2")
-
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
index 30f8a4c..738413e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarOperatorsTest.scala
@@ -18,9 +18,13 @@
 
 package org.apache.flink.table.expressions
 
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.{ScalarOperatorsTestBase, ShouldNotExecuteFunc}
+import org.apache.flink.table.expressions.utils.{ExpressionTestBase, ScalarOperatorsTestBase, ShouldNotExecuteFunc}
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.types.Row
 import org.junit.Test
 
 class ScalarOperatorsTest extends ScalarOperatorsTestBase {
@@ -211,5 +215,4 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
       "trueX")
     testTableApi(12.isNull, "12.isNull", "false")
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
index 5cc8bd1..c631212 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala
@@ -114,6 +114,7 @@ class SqlExpressionTest extends ExpressionTestBase {
     testSqlApi("SIGN(-1.1)", "-1")
     testSqlApi("ROUND(-12.345, 2)", "-12.35")
     testSqlApi("PI", "3.141592653589793")
+    testSqlApi("E()", "2.718281828459045")
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
index 4f47fd5..1d761c3 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/TemporalTypesTest.scala
@@ -540,7 +540,7 @@ class TemporalTypesTest extends ExpressionTestBase {
 
   // ----------------------------------------------------------------------------------------------
 
-  def testData = {
+  def testData: Row = {
     val testData = new Row(11)
     testData.setField(0, Date.valueOf("1990-10-14"))
     testData.setField(1, Time.valueOf("10:20:45"))
@@ -556,7 +556,7 @@ class TemporalTypesTest extends ExpressionTestBase {
     testData
   }
 
-  def typeInfo = {
+  def typeInfo: TypeInformation[Any] = {
     new RowTypeInfo(
       Types.SQL_DATE,
       Types.SQL_TIME,

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
index f0432cf..9b3407e 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
@@ -25,9 +25,9 @@ import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.types.Row
 import org.apache.flink.table.api.{Types, ValidationException}
-import org.apache.flink.table.api.java.utils.UserDefinedScalarFunctions._
+import org.apache.flink.table.runtime.utils.JavaUserDefinedScalarFunctions._
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils._
+import org.apache.flink.table.expressions.utils.{ExpressionTestBase, _}
 import org.apache.flink.table.functions.ScalarFunction
 import org.junit.Test
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/sql/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/sql/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/sql/ExpressionReductionTest.scala
deleted file mode 100644
index c132335..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/sql/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.expressions.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
-  @Test
-  def testReduceCalcExpression(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest'), " +
-      "CAST(TRUE AS VARCHAR) || 'X'" +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "+(7, a) AS EXPR$0",
-        "+(b, 3) AS EXPR$1",
-        "'b' AS EXPR$2",
-        "'STRING' AS EXPR$3",
-        "'teststring' AS EXPR$4",
-        "null AS EXPR$5",
-        "1990-10-24 23:00:01.123 AS EXPR$6",
-        "19 AS EXPR$7",
-        "false AS EXPR$8",
-        "true AS EXPR$9",
-        "2 AS EXPR$10",
-        "true AS EXPR$11",
-        "'trueX' AS EXPR$12"
-      ),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceProjectExpression(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest'), " +
-      "CAST(TRUE AS VARCHAR) || 'X'" +
-      "FROM MyTable"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "+(7, a) AS EXPR$0",
-        "+(b, 3) AS EXPR$1",
-        "'b' AS EXPR$2",
-        "'STRING' AS EXPR$3",
-        "'teststring' AS EXPR$4",
-        "null AS EXPR$5",
-        "1990-10-24 23:00:01.123 AS EXPR$6",
-        "19 AS EXPR$7",
-        "false AS EXPR$8",
-        "true AS EXPR$9",
-        "2 AS EXPR$10",
-        "true AS EXPR$11",
-        "'trueX' AS EXPR$12"
-      )
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceFilterExpression(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "*" +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testNestedTablesReduction(): Unit = {
-    val util = batchTestUtil()
-
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
-
-    util.tableEnv.registerTable("NewTable", newTable)
-
-    val sqlQuery = "SELECT a FROM NewTable"
-
-    // 1+1 should be normalized to 2
-    val expected = unaryNode("DataSetCalc", batchTableNode(0), term("select", "+(2, a) AS a"))
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/table/ExpressionReductionTest.scala
deleted file mode 100644
index 595063a..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/batch/table/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.expressions.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
-  @Test
-  def testReduceCalcExpression(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .where('a > (1 + 7))
-      .select((3 + 4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor(),
-              true.cast(Types.STRING) + "X")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "13 AS _c0",
-        "'b' AS _c1",
-        "'STRING' AS _c2",
-        "'teststring' AS _c3",
-        "1990-10-24 23:00:01.123 AS _c4",
-        "false AS _c5",
-        "true AS _c6",
-        "2E0 AS _c7",
-        "'trueX' AS _c8"
-      ),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceProjectExpression(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .select((3 + 4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor(),
-              true.cast(Types.STRING) + "X")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "13 AS _c0",
-        "'b' AS _c1",
-        "'STRING' AS _c2",
-        "'teststring' AS _c3",
-        "1990-10-24 23:00:01.123 AS _c4",
-        "false AS _c5",
-        "true AS _c6",
-        "2E0 AS _c7",
-        "'trueX' AS _c8"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceFilterExpression(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .where('a > (1 + 7))
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/sql/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/sql/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/sql/ExpressionReductionTest.scala
deleted file mode 100644
index 0a22d8c..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/sql/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.expressions.stream.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
-  @Test
-  def testReduceCalcExpression(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest'), " +
-      "CAST(TRUE AS VARCHAR) || 'X'" +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select",
-        "+(7, a) AS EXPR$0",
-        "+(b, 3) AS EXPR$1",
-        "'b' AS EXPR$2",
-        "'STRING' AS EXPR$3",
-        "'teststring' AS EXPR$4",
-        "null AS EXPR$5",
-        "1990-10-24 23:00:01.123 AS EXPR$6",
-        "19 AS EXPR$7",
-        "false AS EXPR$8",
-        "true AS EXPR$9",
-        "2 AS EXPR$10",
-        "true AS EXPR$11",
-        "'trueX' AS EXPR$12"
-      ),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceProjectExpression(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest'), " +
-      "CAST(TRUE AS VARCHAR) || 'X'" +
-      "FROM MyTable"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select",
-        "+(7, a) AS EXPR$0",
-        "+(b, 3) AS EXPR$1",
-        "'b' AS EXPR$2",
-        "'STRING' AS EXPR$3",
-        "'teststring' AS EXPR$4",
-        "null AS EXPR$5",
-        "1990-10-24 23:00:01.123 AS EXPR$6",
-        "19 AS EXPR$7",
-        "false AS EXPR$8",
-        "true AS EXPR$9",
-        "2 AS EXPR$10",
-        "true AS EXPR$11",
-        "'trueX' AS EXPR$12"
-      )
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceFilterExpression(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "*" +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testNestedTablesReduction(): Unit = {
-    val util = streamTestUtil()
-
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
-
-    util.tableEnv.registerTable("NewTable", newTable)
-
-    val sqlQuery = "SELECT a FROM NewTable"
-
-    // 1+1 should be normalized to 2
-    val expected = unaryNode("DataStreamCalc", streamTableNode(0), term("select", "+(2, a) AS a"))
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/table/ExpressionReductionTest.scala
deleted file mode 100644
index a62bcd3..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/stream/table/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.expressions.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.Test
-
-class ExpressionReductionTest extends TableTestBase {
-
-  @Test
-  def testReduceCalcExpression(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .where('a > (1 + 7))
-      .select((3 + 4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor(),
-              true.cast(Types.STRING) + "X")
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select",
-        "13 AS _c0",
-        "'b' AS _c1",
-        "'STRING' AS _c2",
-        "'teststring' AS _c3",
-        "1990-10-24 23:00:01.123 AS _c4",
-        "false AS _c5",
-        "true AS _c6",
-        "2E0 AS _c7",
-        "'trueX' AS _c8"
-      ),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceProjectExpression(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result =  table
-      .select((3 + 4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor(),
-              true.cast(Types.STRING) + "X")
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select",
-        "13 AS _c0",
-        "'b' AS _c1",
-        "'STRING' AS _c2",
-        "'teststring' AS _c3",
-        "1990-10-24 23:00:01.123 AS _c4",
-        "false AS _c5",
-        "true AS _c6",
-        "2E0 AS _c7",
-        "'trueX' AS _c8"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceFilterExpression(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .where('a > (1 + 7))
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testNestedTablesReduction(): Unit = {
-    val util = streamTestUtil()
-
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
-
-    util.tableEnv.registerTable("NewTable", newTable)
-
-    val sqlQuery = "SELECT a FROM NewTable"
-
-    // 1+1 should be normalized to 2
-    val expected = unaryNode("DataStreamCalc", streamTableNode(0), term("select", "+(2, a) AS a"))
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeAccessTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeAccessTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeAccessTestBase.scala
deleted file mode 100644
index 7243f1d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeAccessTestBase.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions.utils
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo, TypeExtractor}
-import org.apache.flink.api.scala.createTypeInformation
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.expressions.utils.CompositeAccessTestBase.{MyCaseClass, MyCaseClass2, MyPojo}
-import org.apache.flink.types.Row
-
-class CompositeAccessTestBase extends ExpressionTestBase {
-
-  def testData = {
-    val testData = new Row(8)
-    testData.setField(0, MyCaseClass(42, "Bob", booleanField = true))
-    testData.setField(1, MyCaseClass2(MyCaseClass(25, "Timo", booleanField = false)))
-    testData.setField(2, ("a", "b"))
-    testData.setField(3, new org.apache.flink.api.java.tuple.Tuple2[String, String]("a", "b"))
-    testData.setField(4, new MyPojo())
-    testData.setField(5, 13)
-    testData.setField(6, MyCaseClass2(null))
-    testData.setField(7, Tuple1(true))
-    testData
-  }
-
-  def typeInfo = {
-    new RowTypeInfo(
-      createTypeInformation[MyCaseClass],
-      createTypeInformation[MyCaseClass2],
-      createTypeInformation[(String, String)],
-      new TupleTypeInfo(Types.STRING, Types.STRING),
-      TypeExtractor.createTypeInfo(classOf[MyPojo]),
-      Types.INT,
-      createTypeInformation[MyCaseClass2],
-      createTypeInformation[Tuple1[Boolean]]
-      ).asInstanceOf[TypeInformation[Any]]
-  }
-
-}
-
-object CompositeAccessTestBase {
-  case class MyCaseClass(intField: Int, stringField: String, booleanField: Boolean)
-
-  case class MyCaseClass2(objectField: MyCaseClass)
-
-  class MyPojo {
-    private var myInt: Int = 0
-    private var myString: String = "Hello"
-
-    def getMyInt = myInt
-
-    def setMyInt(value: Int) = {
-      myInt = value
-    }
-
-    def getMyString = myString
-
-    def setMyString(value: String) = {
-      myString = myString
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeTypeTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeTypeTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeTypeTestBase.scala
new file mode 100644
index 0000000..8f7360e
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/CompositeTypeTestBase.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.utils
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo, TypeExtractor}
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.expressions.utils.CompositeTypeTestBase.{MyCaseClass, MyCaseClass2, MyPojo}
+import org.apache.flink.types.Row
+
+class CompositeTypeTestBase extends ExpressionTestBase {
+
+  def testData: Row = {
+    val testData = new Row(8)
+    testData.setField(0, MyCaseClass(42, "Bob", booleanField = true))
+    testData.setField(1, MyCaseClass2(MyCaseClass(25, "Timo", booleanField = false)))
+    testData.setField(2, ("a", "b"))
+    testData.setField(3, new org.apache.flink.api.java.tuple.Tuple2[String, String]("a", "b"))
+    testData.setField(4, new MyPojo())
+    testData.setField(5, 13)
+    testData.setField(6, MyCaseClass2(null))
+    testData.setField(7, Tuple1(true))
+    testData
+  }
+
+  def typeInfo: TypeInformation[Any] = {
+    new RowTypeInfo(
+      createTypeInformation[MyCaseClass],
+      createTypeInformation[MyCaseClass2],
+      createTypeInformation[(String, String)],
+      new TupleTypeInfo(Types.STRING, Types.STRING),
+      TypeExtractor.createTypeInfo(classOf[MyPojo]),
+      Types.INT,
+      createTypeInformation[MyCaseClass2],
+      createTypeInformation[Tuple1[Boolean]]
+      ).asInstanceOf[TypeInformation[Any]]
+  }
+}
+
+object CompositeTypeTestBase {
+  case class MyCaseClass(intField: Int, stringField: String, booleanField: Boolean)
+
+  case class MyCaseClass2(objectField: MyCaseClass)
+
+  class MyPojo {
+    private var myInt: Int = 0
+    private var myString: String = "Hello"
+
+    def getMyInt: Int = myInt
+
+    def setMyInt(value: Int): Unit = {
+      myInt = value
+    }
+
+    def getMyString: String = myString
+
+    def setMyString(value: String): Unit = {
+      myString = myString
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
index eacf052..d90d9df 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/MapTypeTestBase.scala
@@ -49,5 +49,4 @@ class MapTypeTestBase extends ExpressionTestBase {
       new MapTypeInfo(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
     ).asInstanceOf[TypeInformation[Any]]
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarFunctionsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarFunctionsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarFunctionsTestBase.scala
deleted file mode 100644
index c526a01..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarFunctionsTestBase.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions.utils
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.table.api.Types
-import org.apache.flink.types.Row
-
-class ScalarFunctionsTestBase extends ExpressionTestBase {
-
-  def testData = {
-    val testData = new Row(33)
-    testData.setField(0, "This is a test String.")
-    testData.setField(1, true)
-    testData.setField(2, 42.toByte)
-    testData.setField(3, 43.toShort)
-    testData.setField(4, 44.toLong)
-    testData.setField(5, 4.5.toFloat)
-    testData.setField(6, 4.6)
-    testData.setField(7, 3)
-    testData.setField(8, " This is a test String. ")
-    testData.setField(9, -42.toByte)
-    testData.setField(10, -43.toShort)
-    testData.setField(11, -44.toLong)
-    testData.setField(12, -4.5.toFloat)
-    testData.setField(13, -4.6)
-    testData.setField(14, -3)
-    testData.setField(15, BigDecimal("-1231.1231231321321321111").bigDecimal)
-    testData.setField(16, Date.valueOf("1996-11-10"))
-    testData.setField(17, Time.valueOf("06:55:44"))
-    testData.setField(18, Timestamp.valueOf("1996-11-10 06:55:44.333"))
-    testData.setField(19, 1467012213000L) // +16979 07:23:33.000
-    testData.setField(20, 25) // +2-01
-    testData.setField(21, null)
-    testData.setField(22, BigDecimal("2").bigDecimal)
-    testData.setField(23, "%This is a test String.")
-    testData.setField(24, "*_This is a test String.")
-    testData.setField(25, 0.42.toByte)
-    testData.setField(26, 0.toShort)
-    testData.setField(27, 0.toLong)
-    testData.setField(28, 0.45.toFloat)
-    testData.setField(29, 0.46)
-    testData.setField(30, 1)
-    testData.setField(31, BigDecimal("-0.1231231321321321111").bigDecimal)
-    testData.setField(32, -1)
-    testData
-  }
-
-  def typeInfo = {
-    new RowTypeInfo(
-      Types.STRING,
-      Types.BOOLEAN,
-      Types.BYTE,
-      Types.SHORT,
-      Types.LONG,
-      Types.FLOAT,
-      Types.DOUBLE,
-      Types.INT,
-      Types.STRING,
-      Types.BYTE,
-      Types.SHORT,
-      Types.LONG,
-      Types.FLOAT,
-      Types.DOUBLE,
-      Types.INT,
-      Types.DECIMAL,
-      Types.SQL_DATE,
-      Types.SQL_TIME,
-      Types.SQL_TIMESTAMP,
-      Types.INTERVAL_MILLIS,
-      Types.INTERVAL_MONTHS,
-      Types.BOOLEAN,
-      Types.DECIMAL,
-      Types.STRING,
-      Types.STRING,
-      Types.BYTE,
-      Types.SHORT,
-      Types.LONG,
-      Types.FLOAT,
-      Types.DOUBLE,
-      Types.INT,
-      Types.DECIMAL,
-      Types.INT).asInstanceOf[TypeInformation[Any]]
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
index 0a77d29..2d22843 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarOperatorsTestBase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.types.Row
 
 class ScalarOperatorsTestBase extends ExpressionTestBase {
 
-  def testData = {
+  def testData: Row = {
     val testData = new Row(13)
     testData.setField(0, 1: Byte)
     testData.setField(1, 1: Short)
@@ -44,7 +44,7 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
     testData
   }
 
-  def typeInfo = {
+  def typeInfo: TypeInformation[Any] = {
     new RowTypeInfo(
       Types.BYTE,
       Types.SHORT,
@@ -65,5 +65,4 @@ class ScalarOperatorsTestBase extends ExpressionTestBase {
   override def functions: Map[String, ScalarFunction] = Map(
     "shouldNotExecuteFunc" -> ShouldNotExecuteFunc
   )
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
new file mode 100644
index 0000000..41fe52c
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/ScalarTypesTestBase.scala
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.utils
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.table.api.Types
+import org.apache.flink.types.Row
+
+class ScalarTypesTestBase extends ExpressionTestBase {
+
+  def testData: Row = {
+    val testData = new Row(34)
+    testData.setField(0, "This is a test String.")
+    testData.setField(1, true)
+    testData.setField(2, 42.toByte)
+    testData.setField(3, 43.toShort)
+    testData.setField(4, 44.toLong)
+    testData.setField(5, 4.5.toFloat)
+    testData.setField(6, 4.6)
+    testData.setField(7, 3)
+    testData.setField(8, " This is a test String. ")
+    testData.setField(9, -42.toByte)
+    testData.setField(10, -43.toShort)
+    testData.setField(11, -44.toLong)
+    testData.setField(12, -4.5.toFloat)
+    testData.setField(13, -4.6)
+    testData.setField(14, -3)
+    testData.setField(15, BigDecimal("-1231.1231231321321321111").bigDecimal)
+    testData.setField(16, Date.valueOf("1996-11-10"))
+    testData.setField(17, Time.valueOf("06:55:44"))
+    testData.setField(18, Timestamp.valueOf("1996-11-10 06:55:44.333"))
+    testData.setField(19, 1467012213000L) // +16979 07:23:33.000
+    testData.setField(20, 25) // +2-01
+    testData.setField(21, null)
+    testData.setField(22, BigDecimal("2").bigDecimal)
+    testData.setField(23, "%This is a test String.")
+    testData.setField(24, "*_This is a test String.")
+    testData.setField(25, 0.42.toByte)
+    testData.setField(26, 0.toShort)
+    testData.setField(27, 0.toLong)
+    testData.setField(28, 0.45.toFloat)
+    testData.setField(29, 0.46)
+    testData.setField(30, 1)
+    testData.setField(31, BigDecimal("-0.1231231321321321111").bigDecimal)
+    testData.setField(32, -1)
+    testData.setField(33, null)
+    testData
+  }
+
+  def typeInfo: TypeInformation[Any] = {
+    new RowTypeInfo(
+      Types.STRING,
+      Types.BOOLEAN,
+      Types.BYTE,
+      Types.SHORT,
+      Types.LONG,
+      Types.FLOAT,
+      Types.DOUBLE,
+      Types.INT,
+      Types.STRING,
+      Types.BYTE,
+      Types.SHORT,
+      Types.LONG,
+      Types.FLOAT,
+      Types.DOUBLE,
+      Types.INT,
+      Types.DECIMAL,
+      Types.SQL_DATE,
+      Types.SQL_TIME,
+      Types.SQL_TIMESTAMP,
+      Types.INTERVAL_MILLIS,
+      Types.INTERVAL_MONTHS,
+      Types.BOOLEAN,
+      Types.DECIMAL,
+      Types.STRING,
+      Types.STRING,
+      Types.BYTE,
+      Types.SHORT,
+      Types.LONG,
+      Types.FLOAT,
+      Types.DOUBLE,
+      Types.INT,
+      Types.DECIMAL,
+      Types.INT,
+      Types.STRING).asInstanceOf[TypeInformation[Any]]
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
deleted file mode 100644
index 5285569..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions.utils
-
-import java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.functions.{ScalarFunction, FunctionContext}
-import org.junit.Assert
-
-import scala.annotation.varargs
-import scala.collection.mutable
-import scala.io.Source
-
-case class SimplePojo(name: String, age: Int)
-
-object Func0 extends ScalarFunction {
-  def eval(index: Int): Int = {
-    index
-  }
-}
-
-object Func1 extends ScalarFunction {
-  def eval(index: Integer): Integer = {
-    index + 1
-  }
-}
-
-object Func2 extends ScalarFunction {
-  def eval(index: Integer, str: String, pojo: SimplePojo): String = {
-    s"$index and $str and $pojo"
-  }
-}
-
-object Func3 extends ScalarFunction {
-  def eval(index: Integer, str: String): String = {
-    s"$index and $str"
-  }
-}
-
-object Func4 extends ScalarFunction {
-  def eval(): Integer = {
-    null
-  }
-}
-
-object Func5 extends ScalarFunction {
-  def eval(): Int = {
-    -1
-  }
-}
-
-object Func6 extends ScalarFunction {
-  def eval(date: Date, time: Time, timestamp: Timestamp): (Date, Time, Timestamp) = {
-    (date, time, timestamp)
-  }
-}
-
-object Func7 extends ScalarFunction {
-  def eval(a: Integer, b: Integer): Integer = {
-    a + b
-  }
-}
-
-object Func8 extends ScalarFunction {
-  def eval(a: Int): String = {
-    "a"
-  }
-
-  def eval(a: Int, b: Int): String = {
-    "b"
-  }
-
-  def eval(a: String, b: String): String = {
-    "c"
-  }
-}
-
-object Func9 extends ScalarFunction {
-  def eval(a: Int, b: Int, c: Long): String = {
-    s"$a and $b and $c"
-  }
-}
-
-object Func10 extends ScalarFunction {
-  def eval(c: Long): Long = {
-    c
-  }
-
-  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
-    Types.SQL_TIMESTAMP
-  }
-}
-
-object Func11 extends ScalarFunction {
-  def eval(a: Int, b: Long): String = {
-    s"$a and $b"
-  }
-}
-
-object Func12 extends ScalarFunction {
-  def eval(a: Long): Long = {
-    a
-  }
-
-  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
-    Types.INTERVAL_MILLIS
-  }
-}
-
-object ShouldNotExecuteFunc extends ScalarFunction {
-  def eval(s: String): Boolean = {
-    throw new Exception("This func should never be executed")
-  }
-}
-
-class RichFunc0 extends ScalarFunction {
-  var openCalled = false
-  var closeCalled = false
-
-  override def open(context: FunctionContext): Unit = {
-    super.open(context)
-    if (openCalled) {
-      Assert.fail("Open called more than once.")
-    } else {
-      openCalled = true
-    }
-    if (closeCalled) {
-      Assert.fail("Close called before open.")
-    }
-  }
-
-  def eval(index: Int): Int = {
-    if (!openCalled) {
-      Assert.fail("Open was not called before eval.")
-    }
-    if (closeCalled) {
-      Assert.fail("Close called before eval.")
-    }
-
-    index + 1
-  }
-
-  override def close(): Unit = {
-    super.close()
-    if (closeCalled) {
-      Assert.fail("Close called more than once.")
-    } else {
-      closeCalled = true
-    }
-    if (!openCalled) {
-      Assert.fail("Open was not called before close.")
-    }
-  }
-}
-
-class RichFunc1 extends ScalarFunction {
-  var added = Int.MaxValue
-
-  override def open(context: FunctionContext): Unit = {
-    added = context.getJobParameter("int.value", "0").toInt
-  }
-
-  def eval(index: Int): Int = {
-    index + added
-  }
-
-  override def close(): Unit = {
-    added = Int.MaxValue
-  }
-}
-
-class RichFunc2 extends ScalarFunction {
-  var prefix = "ERROR_VALUE"
-
-  override def open(context: FunctionContext): Unit = {
-    prefix = context.getJobParameter("string.value", "")
-  }
-
-  def eval(value: String): String = {
-    prefix + "#" + value
-  }
-
-  override def close(): Unit = {
-    prefix = "ERROR_VALUE"
-  }
-}
-
-class RichFunc3 extends ScalarFunction {
-  private val words = mutable.HashSet[String]()
-
-  override def open(context: FunctionContext): Unit = {
-    val file = context.getCachedFile("words")
-    for (line <- Source.fromFile(file.getCanonicalPath).getLines) {
-      words.add(line.trim)
-    }
-  }
-
-  def eval(value: String): Boolean = {
-    words.contains(value)
-  }
-
-  override def close(): Unit = {
-    words.clear()
-  }
-}
-
-class Func13(prefix: String) extends ScalarFunction {
-  def eval(a: String): String = {
-    s"$prefix-$a"
-  }
-}
-
-object Func14 extends ScalarFunction {
-
-  @varargs
-  def eval(a: Int*): Int = {
-    a.sum
-  }
-}
-
-object Func15 extends ScalarFunction {
-
-  @varargs
-  def eval(a: String, b: Int*): String = {
-    a + b.length
-  }
-
-  def eval(a: String): String = {
-    a
-  }
-}
-
-object Func16 extends ScalarFunction {
-
-  def eval(a: Seq[String]): String = {
-    a.mkString(", ")
-  }
-}
-
-object Func17 extends ScalarFunction {
-
-  // Without @varargs, we will throw an exception
-  def eval(a: String*): String = {
-    a.mkString(", ")
-  }
-}
-
-object Func18 extends ScalarFunction {
-  def eval(str: String, prefix: String): Boolean = {
-    str.startsWith(prefix)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
new file mode 100644
index 0000000..5285569
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/userDefinedScalarFunctions.scala
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions.utils
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.{ScalarFunction, FunctionContext}
+import org.junit.Assert
+
+import scala.annotation.varargs
+import scala.collection.mutable
+import scala.io.Source
+
+case class SimplePojo(name: String, age: Int)
+
+object Func0 extends ScalarFunction {
+  def eval(index: Int): Int = {
+    index
+  }
+}
+
+object Func1 extends ScalarFunction {
+  def eval(index: Integer): Integer = {
+    index + 1
+  }
+}
+
+object Func2 extends ScalarFunction {
+  def eval(index: Integer, str: String, pojo: SimplePojo): String = {
+    s"$index and $str and $pojo"
+  }
+}
+
+object Func3 extends ScalarFunction {
+  def eval(index: Integer, str: String): String = {
+    s"$index and $str"
+  }
+}
+
+object Func4 extends ScalarFunction {
+  def eval(): Integer = {
+    null
+  }
+}
+
+object Func5 extends ScalarFunction {
+  def eval(): Int = {
+    -1
+  }
+}
+
+object Func6 extends ScalarFunction {
+  def eval(date: Date, time: Time, timestamp: Timestamp): (Date, Time, Timestamp) = {
+    (date, time, timestamp)
+  }
+}
+
+object Func7 extends ScalarFunction {
+  def eval(a: Integer, b: Integer): Integer = {
+    a + b
+  }
+}
+
+object Func8 extends ScalarFunction {
+  def eval(a: Int): String = {
+    "a"
+  }
+
+  def eval(a: Int, b: Int): String = {
+    "b"
+  }
+
+  def eval(a: String, b: String): String = {
+    "c"
+  }
+}
+
+object Func9 extends ScalarFunction {
+  def eval(a: Int, b: Int, c: Long): String = {
+    s"$a and $b and $c"
+  }
+}
+
+object Func10 extends ScalarFunction {
+  def eval(c: Long): Long = {
+    c
+  }
+
+  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+    Types.SQL_TIMESTAMP
+  }
+}
+
+object Func11 extends ScalarFunction {
+  def eval(a: Int, b: Long): String = {
+    s"$a and $b"
+  }
+}
+
+object Func12 extends ScalarFunction {
+  def eval(a: Long): Long = {
+    a
+  }
+
+  override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+    Types.INTERVAL_MILLIS
+  }
+}
+
+object ShouldNotExecuteFunc extends ScalarFunction {
+  def eval(s: String): Boolean = {
+    throw new Exception("This func should never be executed")
+  }
+}
+
+class RichFunc0 extends ScalarFunction {
+  var openCalled = false
+  var closeCalled = false
+
+  override def open(context: FunctionContext): Unit = {
+    super.open(context)
+    if (openCalled) {
+      Assert.fail("Open called more than once.")
+    } else {
+      openCalled = true
+    }
+    if (closeCalled) {
+      Assert.fail("Close called before open.")
+    }
+  }
+
+  def eval(index: Int): Int = {
+    if (!openCalled) {
+      Assert.fail("Open was not called before eval.")
+    }
+    if (closeCalled) {
+      Assert.fail("Close called before eval.")
+    }
+
+    index + 1
+  }
+
+  override def close(): Unit = {
+    super.close()
+    if (closeCalled) {
+      Assert.fail("Close called more than once.")
+    } else {
+      closeCalled = true
+    }
+    if (!openCalled) {
+      Assert.fail("Open was not called before close.")
+    }
+  }
+}
+
+class RichFunc1 extends ScalarFunction {
+  var added = Int.MaxValue
+
+  override def open(context: FunctionContext): Unit = {
+    added = context.getJobParameter("int.value", "0").toInt
+  }
+
+  def eval(index: Int): Int = {
+    index + added
+  }
+
+  override def close(): Unit = {
+    added = Int.MaxValue
+  }
+}
+
+class RichFunc2 extends ScalarFunction {
+  var prefix = "ERROR_VALUE"
+
+  override def open(context: FunctionContext): Unit = {
+    prefix = context.getJobParameter("string.value", "")
+  }
+
+  def eval(value: String): String = {
+    prefix + "#" + value
+  }
+
+  override def close(): Unit = {
+    prefix = "ERROR_VALUE"
+  }
+}
+
+class RichFunc3 extends ScalarFunction {
+  private val words = mutable.HashSet[String]()
+
+  override def open(context: FunctionContext): Unit = {
+    val file = context.getCachedFile("words")
+    for (line <- Source.fromFile(file.getCanonicalPath).getLines) {
+      words.add(line.trim)
+    }
+  }
+
+  def eval(value: String): Boolean = {
+    words.contains(value)
+  }
+
+  override def close(): Unit = {
+    words.clear()
+  }
+}
+
+class Func13(prefix: String) extends ScalarFunction {
+  def eval(a: String): String = {
+    s"$prefix-$a"
+  }
+}
+
+object Func14 extends ScalarFunction {
+
+  @varargs
+  def eval(a: Int*): Int = {
+    a.sum
+  }
+}
+
+object Func15 extends ScalarFunction {
+
+  @varargs
+  def eval(a: String, b: Int*): String = {
+    a + b.length
+  }
+
+  def eval(a: String): String = {
+    a
+  }
+}
+
+object Func16 extends ScalarFunction {
+
+  def eval(a: Seq[String]): String = {
+    a.mkString(", ")
+  }
+}
+
+object Func17 extends ScalarFunction {
+
+  // Without @varargs, we will throw an exception
+  def eval(a: String*): String = {
+    a.mkString(", ")
+  }
+}
+
+object Func18 extends ScalarFunction {
+  def eval(str: String, prefix: String): Boolean = {
+    str.startsWith(prefix)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/CompositeAccessValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/CompositeAccessValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/CompositeAccessValidationTest.scala
index 5542533..829fe65 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/CompositeAccessValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/CompositeAccessValidationTest.scala
@@ -20,10 +20,10 @@ package org.apache.flink.table.expressions.validation
 
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.CompositeAccessTestBase
+import org.apache.flink.table.expressions.utils.CompositeTypeTestBase
 import org.junit.Test
 
-class CompositeAccessValidationTest extends CompositeAccessTestBase {
+class CompositeAccessValidationTest extends CompositeTypeTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testWrongSqlFieldFull(): Unit = {
@@ -54,7 +54,6 @@ class CompositeAccessValidationTest extends CompositeAccessTestBase {
   def testWrongStringKeyField2(): Unit = {
     testTableApi("fail", "f0.get('fghj')", "fail")
   }
-
 }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
index 819dbbce..deb4518 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarFunctionsValidationTest.scala
@@ -20,10 +20,10 @@ package org.apache.flink.table.expressions.validation
 
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.utils.ScalarFunctionsTestBase
+import org.apache.flink.table.expressions.utils.ScalarTypesTestBase
 import org.junit.Test
 
-class ScalarFunctionsValidationTest extends ScalarFunctionsTestBase {
+class ScalarFunctionsValidationTest extends ScalarTypesTestBase {
 
   // ----------------------------------------------------------------------------------------------
   // String functions
@@ -40,5 +40,4 @@ class ScalarFunctionsValidationTest extends ScalarFunctionsTestBase {
     // Must fail. Parameter of substring must be an Integer not a String.
     testTableApi("test".substring("test".toExpr), "FAIL", "FAIL")
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarOperatorsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarOperatorsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarOperatorsValidationTest.scala
index 68c0b3f..9dcc990 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarOperatorsValidationTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/validation/ScalarOperatorsValidationTest.scala
@@ -44,5 +44,4 @@ class ScalarOperatorsValidationTest extends ScalarOperatorsTestBase {
   def testInvalidStringComparison2(): Unit = {
     testTableApi("w" > 4.toExpr, "FAIL", "FAIL")
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
deleted file mode 100644
index 39b9ec3..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AggFunctionTestBase.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions.aggfunctions
-
-import java.lang.reflect.Method
-import java.math.BigDecimal
-import java.util.{ArrayList => JArrayList, List => JList}
-import org.apache.flink.table.functions.AggregateFunction
-import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
-import org.junit.Assert.assertEquals
-import org.junit.Test
-
-/**
-  * Base class for aggregate function test
-  *
-  * @tparam T the type for the aggregation result
-  */
-abstract class AggFunctionTestBase[T, ACC] {
-  def inputValueSets: Seq[Seq[_]]
-
-  def expectedResults: Seq[T]
-
-  def aggregator: AggregateFunction[T, ACC]
-
-  val accType = aggregator.getClass.getMethod("createAccumulator").getReturnType
-
-  def accumulateFunc: Method = aggregator.getClass.getMethod("accumulate", accType, classOf[Any])
-
-  def retractFunc: Method = null
-
-  @Test
-  // test aggregate and retract functions without partial merge
-  def testAccumulateAndRetractWithoutMerge(): Unit = {
-    // iterate over input sets
-    for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
-      val accumulator = accumulateVals(vals)
-      val result = aggregator.getValue(accumulator)
-      validateResult[T](expected, result)
-
-      if (ifMethodExistInFunction("retract", aggregator)) {
-        retractVals(accumulator, vals)
-        val expectedAccum = aggregator.createAccumulator()
-        //The two accumulators should be exactly same
-        validateResult[ACC](expectedAccum, accumulator)
-      }
-    }
-  }
-
-  @Test
-  def testAggregateWithMerge(): Unit = {
-
-    if (ifMethodExistInFunction("merge", aggregator)) {
-      val mergeFunc =
-        aggregator.getClass.getMethod("merge", accType, classOf[java.lang.Iterable[ACC]])
-      // iterate over input sets
-      for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
-        //equally split the vals sequence into two sequences
-        val (firstVals, secondVals) = vals.splitAt(vals.length / 2)
-
-        //1. verify merge with accumulate
-        val accumulators: JList[ACC] = new JArrayList[ACC]()
-        accumulators.add(accumulateVals(secondVals))
-
-        val acc = accumulateVals(firstVals)
-
-        mergeFunc.invoke(aggregator, acc.asInstanceOf[Object], accumulators)
-        val result = aggregator.getValue(acc)
-        validateResult[T](expected, result)
-
-        //2. verify merge with accumulate & retract
-        if (ifMethodExistInFunction("retract", aggregator)) {
-          retractVals(acc, vals)
-          val expectedAccum = aggregator.createAccumulator()
-          //The two accumulators should be exactly same
-          validateResult[ACC](expectedAccum, acc)
-        }
-      }
-
-      // iterate over input sets
-      for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
-        //3. test partial merge with an empty accumulator
-        val accumulators: JList[ACC] = new JArrayList[ACC]()
-        accumulators.add(aggregator.createAccumulator())
-
-        val acc = accumulateVals(vals)
-
-        mergeFunc.invoke(aggregator, acc.asInstanceOf[Object], accumulators)
-        val result = aggregator.getValue(acc)
-        validateResult[T](expected, result)
-      }
-    }
-  }
-
-  @Test
-  // test aggregate functions with resetAccumulator
-  def testResetAccumulator(): Unit = {
-
-    if (ifMethodExistInFunction("resetAccumulator", aggregator)) {
-      val resetAccFunc = aggregator.getClass.getMethod("resetAccumulator", accType)
-      // iterate over input sets
-      for ((vals, expected) <- inputValueSets.zip(expectedResults)) {
-        val accumulator = accumulateVals(vals)
-        resetAccFunc.invoke(aggregator, accumulator.asInstanceOf[Object])
-        val expectedAccum = aggregator.createAccumulator()
-        //The accumulator after reset should be exactly same as the new accumulator
-        validateResult[ACC](expectedAccum, accumulator)
-      }
-    }
-  }
-
-  private def validateResult[T](expected: T, result: T): Unit = {
-    (expected, result) match {
-      case (e: DecimalSumWithRetractAccumulator, r: DecimalSumWithRetractAccumulator) =>
-        // BigDecimal.equals() value and scale but we are only interested in value.
-        assert(e.f0.compareTo(r.f0) == 0 && e.f1 == r.f1)
-      case (e: DecimalAvgAccumulator, r: DecimalAvgAccumulator) =>
-        // BigDecimal.equals() value and scale but we are only interested in value.
-        assert(e.f0.compareTo(r.f0) == 0 && e.f1 == r.f1)
-      case (e: BigDecimal, r: BigDecimal) =>
-        // BigDecimal.equals() value and scale but we are only interested in value.
-        assert(e.compareTo(r) == 0)
-      case _ =>
-        assertEquals(expected, result)
-    }
-  }
-
-  private def accumulateVals(vals: Seq[_]): ACC = {
-    val accumulator = aggregator.createAccumulator()
-    vals.foreach(
-      v =>
-        accumulateFunc.invoke(aggregator, accumulator.asInstanceOf[Object], v.asInstanceOf[Object])
-    )
-    accumulator
-  }
-
-  private def retractVals(accumulator:ACC, vals: Seq[_]) = {
-    vals.foreach(
-      v => retractFunc.invoke(aggregator, accumulator.asInstanceOf[Object], v.asInstanceOf[Object])
-    )
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
deleted file mode 100644
index fd510ff..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/functions/aggfunctions/AvgFunctionTest.scala
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.functions.aggfunctions
-
-import java.math.BigDecimal
-import org.apache.flink.table.functions.AggregateFunction
-
-/**
-  * Test case for built-in average aggregate function
-  *
-  * @tparam T the type for the aggregation result
-  */
-abstract class AvgAggFunctionTestBase[T: Numeric, ACC] extends AggFunctionTestBase[T, ACC] {
-
-  private val numeric: Numeric[T] = implicitly[Numeric[T]]
-
-  def minVal: T
-
-  def maxVal: T
-
-  override def inputValueSets: Seq[Seq[T]] = Seq(
-    Seq(
-      minVal,
-      minVal,
-      null.asInstanceOf[T],
-      minVal,
-      minVal,
-      null.asInstanceOf[T],
-      minVal,
-      minVal,
-      minVal
-    ),
-    Seq(
-      maxVal,
-      maxVal,
-      null.asInstanceOf[T],
-      maxVal,
-      maxVal,
-      null.asInstanceOf[T],
-      maxVal,
-      maxVal,
-      maxVal
-    ),
-    Seq(
-      minVal,
-      maxVal,
-      null.asInstanceOf[T],
-      numeric.fromInt(0),
-      numeric.negate(maxVal),
-      numeric.negate(minVal),
-      null.asInstanceOf[T]
-    ),
-    Seq(
-      numeric.fromInt(1),
-      numeric.fromInt(2),
-      null.asInstanceOf[T],
-      numeric.fromInt(3),
-      numeric.fromInt(4),
-      numeric.fromInt(5),
-      null.asInstanceOf[T]
-    ),
-    Seq(
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T],
-      null.asInstanceOf[T]
-    )
-  )
-
-  override def expectedResults: Seq[T] = Seq(
-    minVal,
-    maxVal,
-    numeric.fromInt(0),
-    numeric.fromInt(3),
-    null.asInstanceOf[T]
-  )
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}
-
-class ByteAvgAggFunctionTest extends AvgAggFunctionTestBase[Byte, IntegralAvgAccumulator] {
-
-  override def minVal = (Byte.MinValue + 1).toByte
-
-  override def maxVal = (Byte.MaxValue - 1).toByte
-
-  override def aggregator = new ByteAvgAggFunction()
-}
-
-class ShortAvgAggFunctionTest extends AvgAggFunctionTestBase[Short, IntegralAvgAccumulator] {
-
-  override def minVal = (Short.MinValue + 1).toShort
-
-  override def maxVal = (Short.MaxValue - 1).toShort
-
-  override def aggregator = new ShortAvgAggFunction()
-}
-
-class IntAvgAggFunctionTest extends AvgAggFunctionTestBase[Int, IntegralAvgAccumulator] {
-
-  override def minVal = Int.MinValue + 1
-
-  override def maxVal = Int.MaxValue - 1
-
-  override def aggregator = new IntAvgAggFunction()
-}
-
-class LongAvgAggFunctionTest extends AvgAggFunctionTestBase[Long, BigIntegralAvgAccumulator] {
-
-  override def minVal = Long.MinValue + 1
-
-  override def maxVal = Long.MaxValue - 1
-
-  override def aggregator = new LongAvgAggFunction()
-}
-
-class FloatAvgAggFunctionTest extends AvgAggFunctionTestBase[Float, FloatingAvgAccumulator] {
-
-  override def minVal = Float.MinValue
-
-  override def maxVal = Float.MaxValue
-
-  override def aggregator = new FloatAvgAggFunction()
-}
-
-class DoubleAvgAggFunctionTest extends AvgAggFunctionTestBase[Double, FloatingAvgAccumulator] {
-
-  override def minVal = Float.MinValue
-
-  override def maxVal = Float.MaxValue
-
-  override def aggregator = new DoubleAvgAggFunction()
-}
-
-class DecimalAvgAggFunctionTest extends AggFunctionTestBase[BigDecimal, DecimalAvgAccumulator] {
-
-  override def inputValueSets: Seq[Seq[_]] = Seq(
-    Seq(
-      new BigDecimal("987654321000000"),
-      new BigDecimal("-0.000000000012345"),
-      null,
-      new BigDecimal("0.000000000012345"),
-      new BigDecimal("-987654321000000"),
-      null,
-      new BigDecimal("0")
-    ),
-    Seq(
-      new BigDecimal("987654321000000"),
-      new BigDecimal("-0.000000000012345"),
-      null,
-      new BigDecimal("0.000000000012345"),
-      new BigDecimal("-987654321000000"),
-      null,
-      new BigDecimal("5")
-    ),
-    Seq(
-      null,
-      null,
-      null,
-      null
-    )
-  )
-
-  override def expectedResults: Seq[BigDecimal] = Seq(
-    BigDecimal.ZERO,
-    BigDecimal.ONE,
-    null
-  )
-
-  override def aggregator: AggregateFunction[BigDecimal, DecimalAvgAccumulator] =
-    new DecimalAvgAggFunction()
-
-  override def retractFunc = aggregator.getClass.getMethod("retract", accType, classOf[Any])
-}