You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:23 UTC

[14/44] flink git commit: [FLINK-6617][table] Improve JAVA and SCALA logical plans consistent test

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsTest.scala
new file mode 100644
index 0000000..6ca607b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsTest.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+/**
+  * Test for testing aggregate plans.
+  */
+class AggregationsTest extends TableTestBase {
+
+  @Test
+  def testGroupAggregateWithFilter(): Unit = {
+
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val resultTable = sourceTable.groupBy('a)
+      .select('a, 'a.avg, 'b.sum, 'c.count)
+      .where('a === 1)
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", "=(a, 1)")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      calcNode,
+      term("groupBy", "a"),
+      term("select",
+        "a",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2")
+    )
+
+    util.verifyTable(resultTable,expected)
+  }
+
+  @Test
+  def testAggregate(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+    val resultTable = sourceTable.select('a.avg,'b.sum,'c.count)
+
+    val setValues = unaryNode(
+      "DataSetValues",
+      batchTableNode(0),
+      tuples(List(null,null,null)),
+      term("values","a","b","c")
+    )
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2")
+    )
+    util.verifyTable(resultTable, expected)
+  }
+
+  @Test
+  def testAggregateWithFilter(): Unit = {
+    val util = batchTestUtil()
+    val sourceTable = util.addTable[(Int, Long, Int)]("MyTable", 'a, 'b, 'c)
+
+    val resultTable = sourceTable.select('a,'b,'c).where('a === 1)
+      .select('a.avg,'b.sum,'c.count)
+
+    val calcNode = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      // ReduceExpressionsRule will add cast for Project node by force
+      // if the input of the Project node has constant expression.
+      term("select", "CAST(1) AS a", "b", "c"),
+      term("where", "=(a, 1)")
+    )
+
+    val setValues =  unaryNode(
+      "DataSetValues",
+      calcNode,
+      tuples(List(null,null,null)),
+      term("values","a","b","c")
+    )
+
+    val union = unaryNode(
+      "DataSetUnion",
+      setValues,
+      term("union","a","b","c")
+    )
+
+    val expected = unaryNode(
+      "DataSetAggregate",
+      union,
+      term("select",
+        "AVG(a) AS TMP_0",
+        "SUM(b) AS TMP_1",
+        "COUNT(c) AS TMP_2")
+    )
+
+    util.verifyTable(resultTable, expected)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
deleted file mode 100644
index b78dd91..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table
-
-import java.sql.{Date, Time, Timestamp}
-import java.util
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase}
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class CalcITCase(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).select('_1, '_2, '_3)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('a, 'b, 'c)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
-      .select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectRenameAll(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
-      .select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSelectStar(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-
-    val expected = "\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnStringTupleField(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val filterDs = ds.filter( 'c.like("%world%") )
-
-    val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-
-    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
-      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
-      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testNotEquals(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 !== 0)
-    val expected = "1,1,Hi\n" + "3,2,Hello world\n" +
-      "5,3,I am fine.\n" + "7,4,Comment#1\n" + "9,4,Comment#3\n" +
-      "11,5,Comment#5\n" + "13,5,Comment#7\n" + "15,5,Comment#9\n" +
-      "17,6,Comment#11\n" + "19,6,Comment#13\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testDisjunctivePredicate(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a < 2 || 'a > 20)
-    val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testConsecutiveFilters(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
-    val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
-      "9,4,Comment#3\n" + "17,6,Comment#11\n" +
-      "19,6,Comment#13\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterBasicType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.getStringDataSet(env)
-
-    val filterDs = ds.toTable(tEnv, 'a).filter( 'a.like("H%") )
-
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnCustomType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.getCustomTypeDataSet(env)
-    val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
-      .filter( 's.like("%a%") )
-
-    val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleCalc(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1, '_2, '_3)
-        .where('_1 < 7)
-        .select('_1, '_3)
-
-    val expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
-      "4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n"
-      val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCalcWithTwoFilters(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1, '_2, '_3)
-        .where('_1 < 7 && '_2 === 3)
-        .select('_1, '_3)
-        .where('_1 === 4)
-        .select('_1)
-
-    val expected = "4\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCalcWithAggregation(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1, '_2, '_3)
-        .where('_1 < 15)
-        .groupBy('_2)
-        .select('_1.min, '_2.count as 'cnt)
-        .where('cnt > 3)
-
-    val expected = "7,4\n" + "11,4\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCalcJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
-      .where('b > 1).select('a, 'd).where('d === 2)
-
-    val expected = "2,2\n" + "3,2\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAdvancedDataTypes(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env
-      .fromElements((
-        BigDecimal("78.454654654654654").bigDecimal,
-        BigDecimal("4E+9999").bigDecimal,
-        Date.valueOf("1984-07-12"),
-        Time.valueOf("14:34:24"),
-        Timestamp.valueOf("1984-07-12 14:34:24")))
-      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
-        Date.valueOf("1984-07-12"), Time.valueOf("14:34:24"),
-        Timestamp.valueOf("1984-07-12 14:34:24"))
-
-    val expected = "78.454654654654654,4E+9999,1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
-      "11.2,11.2,1984-07-12,14:34:24,1984-07-12 14:34:24.0"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUserDefinedScalarFunction() {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    tableEnv.registerFunction("hashCode", OldHashCode)
-    tableEnv.registerFunction("hashCode", HashCode)
-    val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text)
-    val result = table.select("text.hashCode()")
-    val results = result.toDataSet[Row].collect()
-    val expected = "97\n98\n99"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}
-
-object CalcITCase {
-
-  @Parameterized.Parameters(name = "Table config = {0}")
-  def parameters(): util.Collection[Array[java.lang.Object]] = {
-    Seq[Array[AnyRef]](
-      Array(TableProgramsTestBase.DEFAULT),
-      Array(TableProgramsTestBase.NO_NULL)).asJava
-  }
-}
-
-object HashCode extends ScalarFunction {
-  def eval(s: String): Int = s.hashCode
-}
-
-object OldHashCode extends ScalarFunction {
-  def eval(s: String): Int = -1
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
deleted file mode 100644
index 0076b8e..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.api.Types._
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
-import org.apache.flink.types.Row
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class CastingITCase(configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  @Test
-  def testNumericAutocastInArithmetic() {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements(
-      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tableEnv)
-      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f,
-        '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
-
-    val results = table.toDataSet[Row].collect()
-    val expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1"
-    compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  @throws[Exception]
-  def testNumericAutocastInComparison() {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements(
-      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d),
-      (2.toByte, 2.toShort, 2, 2L, 2.0f, 2.0d))
-      .toTable(tableEnv, 'a, 'b, 'c, 'd, 'e, 'f)
-      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1)
-
-    val results = table.toDataSet[Row].collect()
-    val expected: String = "2,2,2,2,2.0,2.0"
-    compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  @throws[Exception]
-  def testCasting() {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val table = env.fromElements((1, 0.0, 1L, true)).toTable(tableEnv)
-      .select(
-        // * -> String
-      '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING),
-        // NUMERIC TYPE -> Boolean
-      '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN),
-        // NUMERIC TYPE -> NUMERIC TYPE
-      '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT),
-        // Boolean -> NUMERIC TYPE
-      '_4.cast(DOUBLE),
-        // identity casting
-      '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN))
-
-    val results = table.toDataSet[Row].collect()
-    val expected = "1,0.0,1,true," + "true,false,true," +
-      "1.0,0,1," + "1.0," + "1,0.0,1,true\n"
-    compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  @throws[Exception]
-  def testCastFromString() {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tableEnv = TableEnvironment.getTableEnvironment(env)
-    val table = env.fromElements(("1", "true", "2.0")).toTable(tableEnv)
-      .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG),
-        '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN))
-
-    val results = table.toDataSet[Row].collect()
-    val expected = "1,1,1,1,2.0,2.0,true\n"
-    compareResultAsText(results.asJava, expected)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CompositeFlatteningTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CompositeFlatteningTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CompositeFlatteningTest.scala
new file mode 100644
index 0000000..ece9acc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CompositeFlatteningTest.scala
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.scala.createTypeInformation
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.batch.table.CompositeFlatteningTest.{TestCaseClass, giveMeCaseClass}
+import org.apache.flink.table.functions.ScalarFunction
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+
+class CompositeFlatteningTest extends TableTestBase {
+
+  @Test
+  def testMultipleFlatteningsTable(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[((Int, Long), (String, Boolean), String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table.select('a.flatten(), 'c, 'b.flatten())
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "a._1 AS a$_1",
+        "a._2 AS a$_2",
+        "c",
+        "b._1 AS b$_1",
+        "b._2 AS b$_2"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testNestedFlattening(): Unit = {
+    val util = batchTestUtil()
+    val table = util
+      .addTable[((((String, TestCaseClass), Boolean), String), String)]("MyTable", 'a, 'b)
+
+    val result = table.select('a.flatten(), 'b.flatten())
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "a._1 AS a$_1",
+        "a._2 AS a$_2",
+        "b"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testScalarFunctionAccess(): Unit = {
+    val util = batchTestUtil()
+    val table = util
+      .addTable[(String, Int)]("MyTable", 'a, 'b)
+
+    val result = table.select(
+      giveMeCaseClass().get("my"),
+      giveMeCaseClass().get("clazz"),
+      giveMeCaseClass().flatten())
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        s"${giveMeCaseClass.functionIdentifier}().my AS _c0",
+        s"${giveMeCaseClass.functionIdentifier}().clazz AS _c1",
+        s"${giveMeCaseClass.functionIdentifier}().my AS _c2",
+        s"${giveMeCaseClass.functionIdentifier}().clazz AS _c3"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+}
+
+object CompositeFlatteningTest {
+
+  case class TestCaseClass(my: String, clazz: Int)
+
+  object giveMeCaseClass extends ScalarFunction {
+    def eval(): TestCaseClass = {
+      TestCaseClass("hello", 42)
+    }
+
+    override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = {
+      createTypeInformation[TestCaseClass]
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExplainTest.scala
new file mode 100644
index 0000000..4a38741
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExplainTest.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.test.util.MultipleProgramsTestBase
+import org.junit.Assert.assertEquals
+import org.junit._
+
+class ExplainTest
+  extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
+
+  val testFilePath = ExplainTest.this.getClass.getResource("/").getFile
+
+  @Test
+  def testFilterWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements((1, "hello"))
+      .toTable(tEnv, 'a, 'b)
+      .filter("a % 2 = 0")
+
+    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilter0.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(source, result)
+  }
+
+  @Test
+  def testFilterWithExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements((1, "hello"))
+      .toTable(tEnv, 'a, 'b)
+      .filter("a % 2 = 0")
+
+    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testFilter1.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(source, result)
+  }
+
+  @Test
+  def testJoinWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
+    val table = table1.join(table2).where("b = d").select("a, c")
+
+    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testJoin0.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(source, result)
+  }
+
+  @Test
+  def testJoinWithExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
+    val table = table1.join(table2).where("b = d").select("a, c")
+
+    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testJoin1.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(source, result)
+  }
+
+  @Test
+  def testUnionWithoutExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table = table1.unionAll(table2)
+
+    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnion0.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(source, result)
+  }
+
+  @Test
+  def testUnionWithExtended() : Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
+    val table = table1.unionAll(table2)
+
+    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
+    val source = scala.io.Source.fromFile(testFilePath +
+      "../../src/test/scala/resources/testUnion1.out").mkString.replaceAll("\\r\\n", "\n")
+    assertEquals(source, result)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExpressionReductionTest.scala
new file mode 100644
index 0000000..c35a454
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/ExpressionReductionTest.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.api.scala.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.utils.TableTestBase
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.Test
+
+class ExpressionReductionTest extends TableTestBase {
+
+  @Test
+  def testReduceCalcExpression(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .where('a > (1 + 7))
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01.123 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'trueX' AS _c8"
+      ),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceProjectExpression(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .select((3 + 4).toExpr + 6,
+              (11 === 1) ? ("a", "b"),
+              " STRING ".trim,
+              "test" + "string",
+              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
+              1.isNull,
+              "TEST".like("%EST"),
+              2.5.toExpr.floor(),
+              true.cast(Types.STRING) + "X")
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select",
+        "13 AS _c0",
+        "'b' AS _c1",
+        "'STRING' AS _c2",
+        "'teststring' AS _c3",
+        "1990-10-24 23:00:01.123 AS _c4",
+        "false AS _c5",
+        "true AS _c6",
+        "2E0 AS _c7",
+        "'trueX' AS _c8"
+      )
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testReduceFilterExpression(): Unit = {
+    val util = batchTestUtil()
+    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
+
+    val result = table
+      .where('a > (1 + 7))
+
+    val expected = unaryNode(
+      "DataSetCalc",
+      batchTableNode(0),
+      term("select", "a", "b", "c"),
+      term("where", ">(a, 8)")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
index 93e25f8..a174c8b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/FieldProjectionTest.scala
@@ -20,9 +20,8 @@ package org.apache.flink.table.api.scala.batch.table
 import org.apache.flink.api.scala._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.batch.table.FieldProjectionTest._
-import org.apache.flink.table.expressions.{Upper, WindowReference}
+import org.apache.flink.table.expressions.Upper
 import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.plan.logical.TumblingGroupWindow
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{TableTestBase, _}
 import org.junit.Test
@@ -33,12 +32,9 @@ import org.junit.Test
   */
 class FieldProjectionTest extends TableTestBase {
 
-  val util: BatchTableTestUtil = batchTestUtil()
-
-  val streamUtil: StreamTableTestUtil = streamTestUtil()
-
   @Test
   def testSimpleSelect(): Unit = {
+    val util = batchTestUtil()
     val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
     val resultTable = sourceTable.select('a, 'b)
 
@@ -53,6 +49,7 @@ class FieldProjectionTest extends TableTestBase {
 
   @Test
   def testSelectAllFields(): Unit = {
+    val util = batchTestUtil()
     val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
     val resultTable1 = sourceTable.select('*)
     val resultTable2 = sourceTable.select('a, 'b, 'c, 'd)
@@ -65,6 +62,7 @@ class FieldProjectionTest extends TableTestBase {
 
   @Test
   def testSelectAggregation(): Unit = {
+    val util = batchTestUtil()
     val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
     val resultTable = sourceTable.select('a.sum, 'b.max)
 
@@ -92,9 +90,10 @@ class FieldProjectionTest extends TableTestBase {
 
   @Test
   def testSelectFunction(): Unit = {
+    val util = batchTestUtil()
     val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
 
-    util.tEnv.registerFunction("hashCode", MyHashCode)
+    util.tableEnv.registerFunction("hashCode", MyHashCode)
 
     val resultTable = sourceTable.select("hashCode(c), b")
 
@@ -109,6 +108,7 @@ class FieldProjectionTest extends TableTestBase {
 
   @Test
   def testSelectFromGroupedTable(): Unit = {
+    val util = batchTestUtil()
     val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
     val resultTable = sourceTable.groupBy('a, 'c).select('a)
 
@@ -131,6 +131,7 @@ class FieldProjectionTest extends TableTestBase {
 
   @Test
   def testSelectAllFieldsFromGroupedTable(): Unit = {
+    val util = batchTestUtil()
     val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
     val resultTable = sourceTable.groupBy('a, 'c).select('a, 'c)
 
@@ -149,6 +150,7 @@ class FieldProjectionTest extends TableTestBase {
 
   @Test
   def testSelectAggregationFromGroupedTable(): Unit = {
+    val util = batchTestUtil()
     val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
     val resultTable = sourceTable.groupBy('c).select('a.sum)
 
@@ -173,6 +175,7 @@ class FieldProjectionTest extends TableTestBase {
 
   @Test
   def testSelectFromGroupedTableWithNonTrivialKey(): Unit = {
+    val util = batchTestUtil()
     val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
     val resultTable = sourceTable.groupBy(Upper('c) as 'k).select('a.sum)
 
@@ -197,6 +200,7 @@ class FieldProjectionTest extends TableTestBase {
 
   @Test
   def testSelectFromGroupedTableWithFunctionKey(): Unit = {
+    val util = batchTestUtil()
     val sourceTable = util.addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd)
     val resultTable = sourceTable.groupBy(MyHashCode('c) as 'k).select('a.sum)
 
@@ -220,67 +224,8 @@ class FieldProjectionTest extends TableTestBase {
   }
 
   @Test
-  def testSelectFromStreamingWindow(): Unit = {
-    val sourceTable = streamUtil
-      .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
-    val resultTable = sourceTable
-        .window(Tumble over 5.millis on 'rowtime as 'w)
-        .groupBy('w)
-        .select(Upper('c).count, 'a.sum)
-
-    val expected =
-      unaryNode(
-        "DataStreamGroupWindowAggregate",
-        unaryNode(
-          "DataStreamCalc",
-          streamTableNode(0),
-          term("select", "c", "a", "rowtime", "UPPER(c) AS $f3")
-        ),
-        term("window",
-          TumblingGroupWindow(
-            WindowReference("w"),
-            'rowtime,
-            5.millis)),
-        term("select", "COUNT($f3) AS TMP_0", "SUM(a) AS TMP_1")
-      )
-
-    streamUtil.verifyTable(resultTable, expected)
-  }
-
-  @Test
-  def testSelectFromStreamingGroupedWindow(): Unit = {
-    val sourceTable = streamUtil
-      .addTable[(Int, Long, String, Double)]("MyTable", 'a, 'b, 'c, 'd, 'rowtime.rowtime)
-    val resultTable = sourceTable
-        .window(Tumble over 5.millis on 'rowtime as 'w)
-        .groupBy('w, 'b)
-        .select(Upper('c).count, 'a.sum, 'b)
-
-    val expected = unaryNode(
-        "DataStreamCalc",
-        unaryNode(
-          "DataStreamGroupWindowAggregate",
-          unaryNode(
-            "DataStreamCalc",
-            streamTableNode(0),
-            term("select", "c", "a", "b", "rowtime", "UPPER(c) AS $f4")
-          ),
-          term("groupBy", "b"),
-          term("window",
-            TumblingGroupWindow(
-              WindowReference("w"),
-              'rowtime,
-              5.millis)),
-          term("select", "b", "COUNT($f4) AS TMP_0", "SUM(a) AS TMP_1")
-        ),
-        term("select", "TMP_0", "TMP_1", "b")
-    )
-
-    streamUtil.verifyTable(resultTable, expected)
-  }
-
-  @Test
   def testSelectFromAggregatedPojoTable(): Unit = {
+    val util = batchTestUtil()
     val sourceTable = util.addTable[WC]("MyTable", 'word, 'frequency)
     val resultTable = sourceTable
       .groupBy('word)

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
index 12e8897..84a6738 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/GroupWindowTest.scala
@@ -18,9 +18,8 @@
 package org.apache.flink.table.api.scala.batch.table
 
 import org.apache.flink.api.scala._
+import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMerge
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.api.ValidationException
-import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMerge}
 import org.apache.flink.table.expressions.WindowReference
 import org.apache.flink.table.plan.logical._
 import org.apache.flink.table.utils.TableTestBase
@@ -33,84 +32,6 @@ class GroupWindowTest extends TableTestBase {
   // Common test
   //===============================================================================================
 
-  /**
-    * OVER clause is necessary for [[OverAgg0]] window function.
-    */
-  @Test(expected = classOf[ValidationException])
-  def testOverAggregation(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-    val overAgg = new OverAgg0
-    table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('string,'w)
-      .select(overAgg('long, 'int))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupByWithoutWindowAlias(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('string)
-      .select('string, 'int.count)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidRowTimeRef(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    table
-      .window(Tumble over 5.milli on 'long as 'w)
-      .groupBy('w, 'string)
-      .select('string, 'int.count)
-      .window(Slide over 5.milli every 1.milli on 'int as 'w2) // 'Int  does not exist in input.
-      .groupBy('w2)
-      .select('string)
-  }
-
-  //===============================================================================================
-  // Tumbling Windows
-  //===============================================================================================
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidProcessingTimeDefinition(): Unit = {
-    val util = batchTestUtil()
-    // proctime is not allowed
-    util.addTable[(Long, Int, String)]('long.proctime, 'int, 'string)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidProcessingTimeDefinition2(): Unit = {
-    val util = batchTestUtil()
-    // proctime is not allowed
-    util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidEventTimeDefinition(): Unit = {
-    val util = batchTestUtil()
-    // definition must not extend schema
-    util.addTable[(Long, Int, String)]('long, 'int, 'string, 'rowtime.rowtime)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    table
-      .window(Tumble over 2.minutes on 'rowtime as 'w)
-      .groupBy('w, 'long)
-      // invalid function arguments
-      .select(myWeightedAvg('int, 'string))
-  }
-
   @Test
   def testEventTimeTumblingGroupWindowOverCount(): Unit = {
     val util = batchTestUtil()
@@ -176,20 +97,6 @@ class GroupWindowTest extends TableTestBase {
     util.verifyTable(windowedTable, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testAllTumblingGroupWindowWithInvalidUdAggArgs(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    table
-      .window(Tumble over 2.minutes on 'rowtime as 'w)
-      .groupBy('w)
-      // invalid function arguments
-      .select(myWeightedAvg('int, 'string))
-  }
-
   @Test
   def testAllEventTimeTumblingGroupWindowOverTime(): Unit = {
     val util = batchTestUtil()
@@ -242,20 +149,6 @@ class GroupWindowTest extends TableTestBase {
   // Sliding Windows
   //===============================================================================================
 
-  @Test(expected = classOf[ValidationException])
-  def testSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    table
-      .window(Slide over 2.minutes every 1.minute on 'rowtime as 'w)
-      .groupBy('w, 'long)
-      // invalid function arguments
-      .select(myWeightedAvg('int, 'string))
-  }
-
   @Test
   def testEventTimeSlidingGroupWindowOverTime(): Unit = {
     val util = batchTestUtil()
@@ -324,20 +217,6 @@ class GroupWindowTest extends TableTestBase {
     util.verifyTable(windowedTable, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testAllSlidingGroupWindowWithInvalidUdAggArgs(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    table
-      .window(Slide over 2.minutes every 1.minute on 'long as 'w)
-      .groupBy('w)
-      // invalid function arguments
-      .select(myWeightedAvg('int, 'string))
-  }
-
   @Test
   def testAllEventTimeSlidingGroupWindowOverTime(): Unit = {
     val util = batchTestUtil()
@@ -436,32 +315,4 @@ class GroupWindowTest extends TableTestBase {
     util.verifyTable(windowedTable, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    table
-      .window(Session withGap 2.minutes on 'rowtime as 'w)
-      .groupBy('w, 'long)
-      // invalid function arguments
-      .select(myWeightedAvg('int, 'string))
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testAllSessionGroupWindowWithInvalidUdAggArgs(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string)
-
-    val myWeightedAvg = new WeightedAvgWithMerge
-
-    table
-      .window(Session withGap 2.minutes on 'rowtime as 'w)
-      .groupBy('w)
-      // invalid function arguments
-      .select(myWeightedAvg('int, 'string))
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
deleted file mode 100644
index 8085a3c..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
-import org.apache.flink.table.expressions.Literal
-import org.apache.flink.table.utils.TableFunc2
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class JoinITCase(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  @Test
-  def testJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithFilter(): Unit = {
-
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
-
-    val expected = "Hi,Hallo\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithJoinFilter(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e && 'a < 6).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n" +
-      "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
-
-    val results = joinT.toDataSet[Row].collect()
-    val expected = "Hello world, how are you?,Hallo Welt wie\n" + "I am fine.,Hallo Welt wie\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-    "Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithAggregation(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    // use different table env in order to let tmp table ids are the same
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).where('a === 'd).select('g.count)
-
-    val expected = "6"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithGroupedAggregation(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2)
-      .where('a === 'd)
-      .groupBy('a, 'd)
-      .select('b.sum, 'g.count)
-
-    val expected = "6,3\n" + "4,2\n" + "1,1"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinPushThroughJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'j, 'k, 'l)
-
-    val joinT = ds1.join(ds2)
-      .where(Literal(true))
-      .join(ds3)
-      .where('a === 'd && 'e === 'k)
-      .select('a, 'f, 'l)
-
-    val expected = "2,1,Hello\n" + "2,1,Hello world\n" + "1,0,Hi"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithDisjunctivePred(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10)).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" +
-      "Hello,Hallo Welt\n" +
-      "I am fine.,IJK"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testJoinWithExpressionPreds(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
-
-    val expected = "I am fine.,Hallo Welt\n" +
-      "Luke Skywalker,Hallo Welt wie gehts?\n" +
-      "Luke Skywalker,ABC\n" +
-      "Comment#2,HIJ\n" +
-      "Comment#2,IJK"
-    val results = joinT.toDataSet[Row] collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testLeftJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-      "Hello world,ABC\n" + "Hello world, how are you?,null\n" + "I am fine.,HIJ\n" +
-      "I am fine.,IJK\n" + "Luke Skywalker,null\n" + "Comment#1,null\n" + "Comment#2,null\n" +
-      "Comment#3,null\n" + "Comment#4,null\n" + "Comment#5,null\n" + "Comment#6,null\n" +
-      "Comment#7,null\n" + "Comment#8,null\n" + "Comment#9,null\n" + "Comment#10,null\n" +
-      "Comment#11,null\n" + "Comment#12,null\n" + "Comment#13,null\n" + "Comment#14,null\n" +
-      "Comment#15,null\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testRightJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" +
-      "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" +
-      "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "I am fine.,HIJ\n" +
-      "I am fine.,IJK\n" + "null,JKL\n" + "null,KLM\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testLeftJoinWithNonEquiJoinPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testFullJoinWithNonEquiJoinPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testRightJoinWithNonEquiJoinPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g).toDataSet[Row].collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testLeftJoinWithLocalPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.leftOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testFullJoinWithLocalPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.fullOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testRightJoinWithLocalPredicate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 3).select('c, 'g).toDataSet[Row].collect()
-  }
-
-  @Test
-  def testFullOuterJoinWithMultipleKeys(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
-
-    val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" +
-      "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" +
-      "null,DEF\n" + "null,EFG\n" + "null,FGH\n" + "null,GHI\n" + "I am fine.,HIJ\n" +
-      "I am fine.,IJK\n" + "null,JKL\n" + "null,KLM\n" + "Luke Skywalker,null\n" +
-      "Comment#1,null\n" + "Comment#2,null\n" + "Comment#3,null\n" + "Comment#4,null\n" +
-      "Comment#5,null\n" + "Comment#6,null\n" + "Comment#7,null\n" + "Comment#8,null\n" +
-      "Comment#9,null\n" + "Comment#10,null\n" + "Comment#11,null\n" + "Comment#12,null\n" +
-      "Comment#13,null\n" + "Comment#14,null\n" + "Comment#15,null\n" +
-      "Hello world, how are you?,null\n"
-    val results = joinT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUDTFJoinOnTuples(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data = List("hi#world", "how#are#you")
-
-    val ds1 = env.fromCollection(data).toTable(tEnv, 'a)
-    val func2 = new TableFunc2
-
-    val joinDs = ds1.join(func2('a) as ('name, 'len))
-
-    val results = joinDs.toDataSet[Row].collect()
-    val expected = Seq(
-      "hi#world,hi,2",
-      "hi#world,world,5",
-      "how#are#you,how,3",
-      "how#are#you,are,3",
-      "how#are#you,you,3").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ecde7bc1/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
deleted file mode 100644
index 67da532..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.table.api.scala._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.types.Row
-import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.Random
-
-@RunWith(classOf[Parameterized])
-class SetOperatorsITCase(
-    configMode: TableConfigMode)
-  extends TableProgramsCollectionTestBase(configMode) {
-
-  @Test
-  def testUnionAll(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f)
-
-    val unionDs = ds1.unionAll(ds2).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUnion(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f)
-
-    val unionDs = ds1.union(ds2).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTernaryUnionAll(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val unionDs = ds1.unionAll(ds2).unionAll(ds3).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" +
-      "Hi\n" + "Hello\n" + "Hello world\n" +
-      "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTernaryUnion(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val unionDs = ds1.union(ds2).union(ds3).select('c)
-
-    val results = unionDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testMinusAll(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'a, 'b, 'c)
-
-    val minusDs = ds1.unionAll(ds1).unionAll(ds1)
-      .minusAll(ds2.unionAll(ds2)).select('c)
-
-    val results = minusDs.toDataSet[Row].collect()
-    val expected = "Hi\n" +
-      "Hello\n" + "Hello world\n" +
-      "Hello\n" + "Hello world\n" +
-      "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testMinus(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'a, 'b, 'c)
-
-    val minusDs = ds1.unionAll(ds1).unionAll(ds1)
-      .minus(ds2.unionAll(ds2)).select('c)
-
-    val results = minusDs.toDataSet[Row].collect()
-    val expected = "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testMinusDifferentFieldNames(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = env.fromElements((1, 1L, "Hi")).toTable(tEnv, 'd, 'e, 'f)
-
-    val minusDs = ds1.unionAll(ds1).unionAll(ds1)
-      .minus(ds2.unionAll(ds2)).select('c)
-
-    val results = minusDs.toDataSet[Row].collect()
-    val expected = "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testIntersect(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val data = new mutable.MutableList[(Int, Long, String)]
-    data.+=((1, 1L, "Hi"))
-    data.+=((2, 2L, "Hello"))
-    data.+=((2, 2L, "Hello"))
-    data.+=((3, 2L, "Hello world!"))
-    val ds2 = env.fromCollection(Random.shuffle(data)).toTable(tEnv, 'a, 'b, 'c)
-
-    val intersectDS = ds1.intersect(ds2).select('c).toDataSet[Row]
-
-    val results = intersectDS.collect()
-
-    val expected = "Hi\n" + "Hello\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testIntersectAll(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data1 = new mutable.MutableList[Int]
-    data1 += (1, 1, 1, 2, 2)
-    val data2 = new mutable.MutableList[Int]
-    data2 += (1, 2, 2, 2, 3)
-    val ds1 = env.fromCollection(data1).toTable(tEnv, 'c)
-    val ds2 = env.fromCollection(data2).toTable(tEnv, 'c)
-
-    val intersectDS = ds1.intersectAll(ds2).select('c).toDataSet[Row]
-
-    val expected = "1\n2\n2"
-    val results = intersectDS.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testIntersectWithDifferentFieldNames(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'e, 'f, 'g)
-
-    val intersectDs = ds1.intersect(ds2).select('c)
-
-    val results = intersectDs.toDataSet[Row].collect()
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testIntersectWithScalarExpression(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a + 1, 'b, 'c)
-    val ds2 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a + 1, 'b, 'c)
-
-    val intersectDs = ds1.intersect(ds2)
-
-    val results = intersectDs.toDataSet[Row].collect()
-    val expected = "2,1,Hi\n" + "3,2,Hello\n" + "4,2,Hello world\n"
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}