You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2016/09/29 13:47:48 UTC

[1/3] flink git commit: [FLINK-3656] [table] Consolidate ITCases

Repository: flink
Updated Branches:
  refs/heads/master 8243138c3 -> 7758571ae


http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala
deleted file mode 100644
index 8889b37..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/GroupedAggregationsITCase.scala
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class GroupedAggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupingOnNonExistentField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. '_foo not a valid field
-      .groupBy('_foo)
-      .select('a.avg)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupingInvalidSelection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('a, 'b)
-      // must fail. 'c is not a grouping key or aggregation
-      .select('c)
-  }
-
-  @Test
-  def testGroupedAggregate(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum)
-
-    val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupingKeyForwardIfNotUsed(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum)
-
-    val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupNoAggregation(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('a.sum as 'd, 'b)
-      .groupBy('b, 'd)
-      .select('b)
-
-    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithLongKeys(): Unit = {
-    // This uses very long keys to force serialized comparison.
-    // With short keys, the normalized key is sufficient.
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = env.fromElements(
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
-      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2))
-      .rebalance().setParallelism(2).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('a, 'b)
-      .select('c.sum)
-
-    val expected = "10\n" + "8\n"
-    val results = ds.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithConstant1(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 4 as 'four, 'b)
-      .groupBy('four, 'a)
-      .select('four, 'b.sum)
-
-    val expected = "4,2\n" + "4,3\n" + "4,5\n" + "4,5\n" + "4,5\n" + "4,6\n" +
-      "4,6\n" + "4,6\n" + "4,3\n" + "4,4\n" + "4,6\n" + "4,1\n" + "4,4\n" +
-      "4,4\n" + "4,5\n" + "4,6\n" + "4,2\n" + "4,3\n" + "4,4\n" + "4,5\n" + "4,6\n"
-    val results = t.toDataSet[Row].collect()
-
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithConstant2(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-        .select('b, 4 as 'four, 'a)
-        .groupBy('b, 'four)
-        .select('four, 'a.sum)
-
-    val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithExpression(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-        .groupBy('e, 'b % 3)
-        .select('c.min, 'e, 'a.avg, 'd.count)
-
-    val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" +
-        "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testGroupedAggregateWithFilter(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('b)
-      .select('b, 'a.sum)
-      .where('b === 2)
-
-    val expected = "2,5\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
index f6e6081..67cac14 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/JoinITCase.scala
@@ -19,12 +19,14 @@
 package org.apache.flink.api.scala.batch.table
 
 import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableException, ValidationException, Row, TableEnvironment}
 import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.table.{Row, TableEnvironment, TableException, ValidationException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -32,12 +34,15 @@ import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
-class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+class JoinITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
 
   @Test
   def testJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -53,7 +58,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   def testJoinWithFilter(): Unit = {
 
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
@@ -68,7 +73,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithJoinFilter(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -84,7 +89,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -100,7 +105,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[ValidationException])
   def testJoinNonExistingKey(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -114,7 +119,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[ValidationException])
   def testJoinWithNonMatchingKeyTypes(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -128,7 +133,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[ValidationException])
   def testJoinWithAmbiguousFields(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c)
@@ -142,7 +147,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[TableException])
   def testNoEqualityJoinPredicate1(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -156,7 +161,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[TableException])
   def testNoEqualityJoinPredicate2(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -170,7 +175,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithAggregation(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -185,7 +190,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithGroupedAggregation(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -203,7 +208,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinPushThroughJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -223,7 +228,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithDisjunctivePred(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -240,7 +245,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testJoinWithExpressionPreds(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -259,8 +264,8 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[ValidationException])
   def testJoinTablesFromDifferentEnvs(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h)
@@ -272,7 +277,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testLeftJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
@@ -294,7 +299,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[ValidationException])
   def testNoJoinCondition(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
@@ -306,7 +311,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test(expected = classOf[ValidationException])
   def testNoEquiJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
@@ -318,7 +323,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testRightJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
@@ -337,7 +342,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testRightJoinWithNotOnlyEquiJoin(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
@@ -353,7 +358,7 @@ class JoinITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode)
   @Test
   def testFullOuterJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     tEnv.getConfig.setNullCheck(true)
 
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala
deleted file mode 100644
index 1143afd..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/SelectITCase.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Assert._
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class SelectITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).select('_1, '_2, '_3)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('a, 'b, 'c)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
-      .select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectRenameAll(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
-      .select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectInvalidFieldFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. Field 'foo does not exist
-      .select('a, 'foo)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectAmbiguousRenaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'foo
-      .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectAmbiguousRenaming2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'a
-      .select('a, 'b as 'a).toDataSet[Row].print()
-  }
-
-  @Test
-  def testSelectStar(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAliasStarException(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1 as '*, '_2 as 'b, '_1 as 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala
deleted file mode 100644
index 84bdbb0..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/ToTableITCase.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class ToTableITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testToTable(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testToTableFromCaseClass(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data = List(
-      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
-      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
-      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
-    val t =  env.fromCollection(data)
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-      .select('a, 'b, 'c, 'd)
-
-    val expected: String =
-      "Peter,28,4000.0,Sales\n" +
-      "Anna,56,10000.0,Engineering\n" +
-      "Lucy,42,6000.0,HR\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testToTableFromAndToCaseClass(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data = List(
-      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
-      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
-      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
-    val t =  env.fromCollection(data)
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-      .select('a, 'b, 'c, 'd)
-
-    val expected: String =
-      "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
-      "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
-      "SomeCaseClass(Lucy,42,6000.0,HR)\n"
-    val results = t.toDataSet[SomeCaseClass].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithToFewFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Number of fields does not match.
-      .toTable(tEnv, 'a, 'b)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithToManyFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Number of fields does not match.
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithAmbiguousFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Field names not unique.
-      .toTable(tEnv, 'a, 'b, 'b)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithNonFieldReference1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    // Must fail. as() can only have field references
-    CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a + 1, 'b, 'c)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithNonFieldReference2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    // Must fail. as() can only have field references
-    CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a as 'foo, 'b, 'c)
-  }
-
-}
-
-case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
-  def this() { this("", 0, 0.0, "") }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
index 772850d..2ce42d4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/utils/TableProgramsTestBase.scala
@@ -20,8 +20,7 @@ package org.apache.flink.api.scala.batch.utils
 
 import java.util
 
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode._
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.{EFFICIENT, NO_NULL, TableConfigMode}
 import org.apache.flink.api.table.TableConfig
 import org.apache.flink.test.util.MultipleProgramsTestBase
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
@@ -37,7 +36,7 @@ class TableProgramsTestBase(
   def config: TableConfig = {
     val conf = new TableConfig
     tableConfigMode match {
-      case NULL =>
+      case NO_NULL =>
         conf.setNullCheck(false)
       case EFFICIENT =>
         conf.setEfficientTypeUsage(true)
@@ -48,21 +47,14 @@ class TableProgramsTestBase(
 }
 
 object TableProgramsTestBase {
-  sealed trait TableConfigMode { def nullCheck: Boolean; def efficientTypes: Boolean }
-  object TableConfigMode {
-    case object DEFAULT extends TableConfigMode {
-      val nullCheck = false; val efficientTypes = false
-    }
-    case object NULL extends TableConfigMode {
-      val nullCheck = true; val efficientTypes = false
-    }
-    case object EFFICIENT extends TableConfigMode {
-      val nullCheck = false; val efficientTypes = true
-    }
-  }
+  case class TableConfigMode(nullCheck: Boolean, efficientTypes: Boolean)
+
+  val DEFAULT = TableConfigMode(nullCheck = true, efficientTypes = false)
+  val NO_NULL = TableConfigMode(nullCheck = false, efficientTypes = false)
+  val EFFICIENT = TableConfigMode(nullCheck = false, efficientTypes = true)
 
   @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
   def parameters(): util.Collection[Array[java.lang.Object]] = {
-    Seq[Array[AnyRef]](Array(TestExecutionMode.COLLECTION, TableConfigMode.DEFAULT))
+    Seq[Array[AnyRef]](Array(TestExecutionMode.COLLECTION, DEFAULT))
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
new file mode 100644
index 0000000..578ad30
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/CalcITCase.scala
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.stream.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.mutable
+
+class CalcITCase extends StreamingMultipleProgramsTestBase {
+
+  @Test
+  def testSimpleSelectAll(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSelectFirst(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("1", "2", "3")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSimpleSelectWithNaming(): Unit = {
+
+    // verify ProjectMergeRule.
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
+      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
+      .select('a, 'b)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
+      "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
+      "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSimpleSelectAllWithAs(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+      .select('a, 'b, 'c)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testAsWithToFewFields(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testAsWithToManyFields(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testAsWithAmbiguousFields(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+
+  @Test(expected = classOf[TableException])
+  def testOnlyFieldRefInAs(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
+
+    val results = ds.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("no")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testSimpleFilter(): Unit = {
+    /*
+     * Test simple filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter('a === 3)
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList("3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+    /*
+     * Test all-rejecting filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(false) )
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    assertEquals(true, StreamITCase.testResults.isEmpty)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+    /*
+     * Test all-passing filter
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(true) )
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+        "1,1,Hi",
+        "2,2,Hello",
+        "3,2,Hello world")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testFilterOnIntegerTupleField(): Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 === 0 )
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "2,2,Hello", "4,3,Hello world, how are you?",
+      "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
+      "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
+      "18,6,Comment#12", "20,6,Comment#14")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNotEquals(): Unit = {
+    /*
+     * Test filter on Integer tuple field.
+     */
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 !== 0)
+    val results = filterDs.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+    val expected = mutable.MutableList(
+      "1,1,Hi", "3,2,Hello world",
+      "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
+      "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
+      "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/FilterITCase.scala
deleted file mode 100644
index 45b9b04..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/FilterITCase.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{Row, TableEnvironment}
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class FilterITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testSimpleFilter(): Unit = {
-    /*
-     * Test simple filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter('a === 3)
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-    /*
-     * Test all-rejecting filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    assertEquals(true, StreamITCase.testResults.isEmpty)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-    /*
-     * Test all-passing filter
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "2,2,Hello", "4,3,Hello world, how are you?",
-      "6,3,Luke Skywalker", "8,4,Comment#2", "10,4,Comment#4",
-      "12,5,Comment#6", "14,5,Comment#8", "16,6,Comment#10",
-      "18,6,Comment#12", "20,6,Comment#14")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testNotEquals(): Unit = {
-    /*
-     * Test filter on Integer tuple field.
-     */
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 !== 0)
-    val results = filterDs.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-    val expected = mutable.MutableList(
-      "1,1,Hi", "3,2,Hello world",
-      "5,3,I am fine.", "7,4,Comment#1", "9,4,Comment#3",
-      "11,5,Comment#5", "13,5,Comment#7", "15,5,Comment#9",
-      "17,6,Comment#11", "19,6,Comment#13", "21,6,Comment#15")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala
deleted file mode 100644
index c6a2139..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/stream/table/SelectITCase.scala
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.stream.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.stream.utils.{StreamITCase, StreamTestData}
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.{Row, TableEnvironment, TableException}
-import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
-import org.junit.Assert._
-import org.junit.Test
-
-import scala.collection.mutable
-
-class SelectITCase extends StreamingMultipleProgramsTestBase {
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1, '_2, '_3)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSelectFirst(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).select('_1)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("1", "2", "3")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleSelectWithNaming(): Unit = {
-
-    // verify ProjectMergeRule.
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv)
-      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
-      .select('a, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-      "1,1", "2,2", "3,2", "4,3", "5,3", "6,3", "7,4",
-      "8,4", "9,4", "10,4", "11,5", "12,5", "13,5", "14,5", "15,5",
-      "16,6", "17,6", "18,6", "19,6", "20,6", "21,6")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test
-  def testSimpleSelectAllWithAs(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList(
-        "1,1,Hi",
-        "2,2,Hello",
-        "3,2,Hello world")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testAsWithToFewFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testAsWithToManyFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'c, 'd)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testAsWithAmbiguousFields(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b, 'b)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-
-
-  @Test(expected = classOf[TableException])
-  def testOnlyFieldRefInAs(): Unit = {
-
-    val env = StreamExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-    StreamITCase.testResults = mutable.MutableList()
-    val ds = StreamTestData.get3TupleDataStream(env).toTable(tEnv, 'a, 'b as 'c, 'd)
-
-    val results = ds.toDataStream[Row]
-    results.addSink(new StreamITCase.StringSink)
-    env.execute()
-
-    val expected = mutable.MutableList("no")
-    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
-  }
-}


[2/3] flink git commit: [FLINK-3656] [table] Consolidate ITCases

Posted by tw...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
deleted file mode 100644
index 581c8ed..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/SelectITCase.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.ValidationException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class SelectITCase extends TableProgramsTestBase {
-
-	public SelectITCase(TestExecutionMode mode, TableConfigMode configMode) {
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testSimpleSelectAllWithAs() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds, "a,b,c");
-
-		Table result = in
-				.select("a, b, c");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-			"20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testSimpleSelectWithNaming() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds);
-
-		Table result = in
-				.select("f0 as a, f1 as b")
-				.select("a, b");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testSimpleSelectRenameAll() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds);
-
-		Table result = in
-			.select("f0 as a, f1 as b, f2 as c")
-			.select("a, b");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-			"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-			"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testSelectInvalidField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv.fromDataSet(ds, "a, b, c")
-			// Must fail. Field foo does not exist
-			.select("a + 1, foo + 2");
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testSelectAmbiguousFieldNames() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv.fromDataSet(ds, "a, b, c")
-			// Must fail. Field foo does not exist
-			.select("a + 1 as foo, b + 2 as foo");
-	}
-
-	@Test
-	public void testSelectStar() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds, "a,b,c");
-
-		Table result = in
-			.select("*");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-		                  "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-		                  "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-		                  "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-		                  "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-		                  "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-		                  "20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
index c33e1ef..2d82dbc 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
@@ -18,6 +18,8 @@
 
 package org.apache.flink.api.scala.batch
 
+import java.util
+
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
@@ -140,4 +142,131 @@ class TableEnvironmentITCase(
     // Must fail. Table is bound to different TableEnvironment.
     tEnv2.registerTable("MyTable", t1)
   }
+
+  @Test
+  def testToTable(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .select('a, 'b, 'c)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testToTableFromCaseClass(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val data = List(
+      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+    val t =  env.fromCollection(data)
+      .toTable(tEnv, 'a, 'b, 'c, 'd)
+      .select('a, 'b, 'c, 'd)
+
+    val expected: String =
+      "Peter,28,4000.0,Sales\n" +
+      "Anna,56,10000.0,Engineering\n" +
+      "Lucy,42,6000.0,HR\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testToTableFromAndToCaseClass(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val data = List(
+      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
+      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
+      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
+
+    val t =  env.fromCollection(data)
+      .toTable(tEnv, 'a, 'b, 'c, 'd)
+      .select('a, 'b, 'c, 'd)
+
+    val expected: String =
+      "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
+      "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
+      "SomeCaseClass(Lucy,42,6000.0,HR)\n"
+    val results = t.toDataSet[SomeCaseClass].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToTableWithToFewFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env)
+      // Must fail. Number of fields does not match.
+      .toTable(tEnv, 'a, 'b)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToTableWithToManyFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env)
+      // Must fail. Number of fields does not match.
+      .toTable(tEnv, 'a, 'b, 'c, 'd)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToTableWithAmbiguousFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env)
+      // Must fail. Field names not unique.
+      .toTable(tEnv, 'a, 'b, 'b)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToTableWithNonFieldReference1(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    // Must fail. as() can only have field references
+    CollectionDataSets.get3TupleDataSet(env)
+      .toTable(tEnv, 'a + 1, 'b, 'c)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testToTableWithNonFieldReference2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    // Must fail. as() can only have field references
+    CollectionDataSets.get3TupleDataSet(env)
+      .toTable(tEnv, 'a as 'foo, 'b, 'c)
+  }
+}
+
+object TableEnvironmentITCase {
+
+  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+  def parameters(): util.Collection[Array[java.lang.Object]] = {
+    Seq[Array[AnyRef]](
+      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
+      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.EFFICIENT)).asJava
+  }
+}
+
+case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
+  def this() { this("", 0, 0.0, "") }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
index 407fa4c..d7e99d4 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
@@ -20,14 +20,15 @@ package org.apache.flink.api.scala.batch
 
 import java.io.File
 
-import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.ExecutionEnvironment
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.TableEnvironment
 import org.apache.flink.api.table.sinks.CsvTableSink
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
 import org.junit.Test
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -35,8 +36,9 @@ import org.junit.runners.Parameterized
 
 @RunWith(classOf[Parameterized])
 class TableSinkITCase(
-    mode: TestExecutionMode)
-  extends MultipleProgramsTestBase(mode) {
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
 
   @Test
   def testBatchTableSink(): Unit = {
@@ -46,7 +48,7 @@ class TableSinkITCase(
     val path = tmpFile.toURI.toString
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
     env.setParallelism(4)
 
     val input = CollectionDataSets.get3TupleDataSet(env)

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
index 6fd0d13..08bee72 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
@@ -39,14 +39,16 @@ import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
-class TableSourceITCase(mode: TestExecutionMode)
-  extends MultipleProgramsTestBase(mode) {
+class TableSourceITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
 
   @Test
   def testBatchTableSourceTableAPI(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
     val results = tEnv
@@ -65,7 +67,7 @@ class TableSourceITCase(mode: TestExecutionMode)
   def testBatchTableSourceSQL(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
     val results = tEnv.sql(
@@ -100,7 +102,7 @@ class TableSourceITCase(mode: TestExecutionMode)
     tmpWriter.close()
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val csvTable = new CsvTableSource(
       tempFile.getAbsolutePath,

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
new file mode 100644
index 0000000..49a97e3
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.scala.batch.sql
+
+import java.util
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
+import org.apache.flink.api.scala.table._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
+import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.TestBaseUtils
+import org.junit._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class CalcITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
+
+  @Test
+  def testSelectStarFromTable(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSelectStarFromDataSet(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSimpleSelectAll(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT a, b, c FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSelectWithNaming(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidFields(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT a, foo FROM MyTable"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    tEnv.sql(sqlQuery)
+  }
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE false"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE true"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFilterOnString(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFilterOnInteger(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
+      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
+      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testDisjunctivePredicate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFilterWithAnd(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    tEnv.registerTable("MyTable", ds)
+
+    val result = tEnv.sql(sqlQuery)
+
+    val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
+      "9,4,Comment#3\n" + "17,6,Comment#11\n" +
+      "19,6,Comment#13\n" + "21,6,Comment#15\n"
+    val results = result.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+}
+
+object CalcITCase {
+
+  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+  def parameters(): util.Collection[Array[java.lang.Object]] = {
+    Seq[Array[AnyRef]](
+      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
+      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/FilterITCase.scala
deleted file mode 100644
index cc4da38..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/FilterITCase.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class FilterITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE false"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE true"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnString(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnInteger(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
-      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
-      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testDisjunctivePredicate(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterWithAnd(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
-      "9,4,Comment#3\n" + "17,6,Comment#11\n" +
-      "19,6,Comment#13\n" + "21,6,Comment#15\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala
deleted file mode 100644
index 07b802d..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/SelectITCase.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{ValidationException, Row, TableEnvironment}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class SelectITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testSelectStarFromTable(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSelectStarFromDataSet(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT a, b, c FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSelectWithNaming(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidFields(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT a, foo FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    tEnv.sql(sqlQuery)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
index 7c0cdff..16c8ece 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/AggregationsITCase.scala
@@ -19,12 +19,14 @@
 package org.apache.flink.api.scala.batch.table
 
 import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
 import org.apache.flink.examples.scala.WordCountTable.{WC => MyWC}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -32,13 +34,16 @@ import org.junit.runners.Parameterized
 import scala.collection.JavaConverters._
 
 @RunWith(classOf[Parameterized])
-class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
+class AggregationsITCase(
+    mode: TestExecutionMode,
+    configMode: TableConfigMode)
+  extends TableProgramsTestBase(mode, configMode) {
 
   @Test
   def testAggregationTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
       .select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
@@ -52,7 +57,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testAggregationOnNonExistingField(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
       // Must fail. Field 'foo does not exist.
@@ -63,7 +68,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = env.fromElements(
       (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
@@ -79,7 +84,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testProjection(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = env.fromElements(
       (1: Byte, 1: Short),
@@ -95,7 +100,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testAggregationWithArithmetic(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
       .select(('_1 + 2).avg + 2, '_2.count + 5)
@@ -109,7 +114,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testAggregationWithTwoCount(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
       .select('_1.count, '_2.count)
@@ -123,7 +128,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testAggregationAfterProjection(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = env.fromElements(
       (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
@@ -140,7 +145,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testNonWorkingAggregationDataTypes(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = env.fromElements(("Hello", 1)).toTable(tEnv)
       // Must fail. Field '_1 is not a numeric type.
@@ -153,7 +158,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testNoNestedAggregations(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = env.fromElements(("Hello", 1)).toTable(tEnv)
       // Must fail. Sum aggregation can not be chained.
@@ -164,7 +169,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testSQLStyleAggregations(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
       .select(
@@ -184,7 +189,7 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
   def testPojoAggregation(): Unit = {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val input = env.fromElements(
       MyWC("hello", 1),
@@ -204,5 +209,196 @@ class AggregationsITCase(mode: TestExecutionMode) extends MultipleProgramsTestBa
     TestBaseUtils.compareResultAsText(mappedResult.asJava, expected)
   }
 
+  @Test
+  def testDistinct(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val distinct = ds.select('b).distinct()
+
+    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+    val results = distinct.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+    val distinct = ds.groupBy('a, 'e).select('e).distinct()
+
+    val expected = "1\n" + "2\n" + "3\n"
+    val results = distinct.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupingOnNonExistentField(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      // must fail. '_foo not a valid field
+      .groupBy('_foo)
+      .select('a.avg)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupingInvalidSelection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('a, 'b)
+      // must fail. 'c is not a grouping key or aggregation
+      .select('c)
+  }
+
+  @Test
+  def testGroupedAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('b)
+      .select('b, 'a.sum)
+
+    val expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testGroupingKeyForwardIfNotUsed(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('b)
+      .select('a.sum)
+
+    val expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testGroupNoAggregation(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env)
+      .toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('b)
+      .select('a.sum as 'd, 'b)
+      .groupBy('b, 'd)
+      .select('b)
+
+    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testGroupedAggregateWithLongKeys(): Unit = {
+    // This uses very long keys to force serialized comparison.
+    // With short keys, the normalized key is sufficient.
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = env.fromElements(
+      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhaa", 1, 2),
+      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
+      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
+      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2),
+      ("hhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhhab", 1, 2))
+      .rebalance().setParallelism(2).toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('a, 'b)
+      .select('c.sum)
+
+    val expected = "10\n" + "8\n"
+    val results = ds.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testGroupedAggregateWithConstant1(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      .select('a, 4 as 'four, 'b)
+      .groupBy('four, 'a)
+      .select('four, 'b.sum)
+
+    val expected = "4,2\n" + "4,3\n" + "4,5\n" + "4,5\n" + "4,5\n" + "4,6\n" +
+      "4,6\n" + "4,6\n" + "4,3\n" + "4,4\n" + "4,6\n" + "4,1\n" + "4,4\n" +
+      "4,4\n" + "4,5\n" + "4,6\n" + "4,2\n" + "4,3\n" + "4,4\n" + "4,5\n" + "4,6\n"
+    val results = t.toDataSet[Row].collect()
+
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testGroupedAggregateWithConstant2(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+        .select('b, 4 as 'four, 'a)
+        .groupBy('b, 'four)
+        .select('four, 'a.sum)
+
+    val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testGroupedAggregateWithExpression(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+        .groupBy('e, 'b % 3)
+        .select('c.min, 'e, 'a.avg, 'd.count)
+
+    val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" +
+        "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testGroupedAggregateWithFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('b)
+      .select('b, 'a.sum)
+      .where('b === 2)
+
+    val expected = "2,5\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
index d64e414..4ffda87 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/CalcITCase.scala
@@ -18,14 +18,18 @@
 
 package org.apache.flink.api.scala.batch.table
 
+import java.util
+
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
 import org.apache.flink.api.scala.table._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment}
+import org.apache.flink.api.table.expressions.Literal
+import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
+import org.junit.Assert._
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -39,6 +43,299 @@ class CalcITCase(
   extends TableProgramsTestBase(mode, configMode) {
 
   @Test
+  def testSimpleSelectAll(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).select('_1, '_2, '_3)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSimpleSelectAllWithAs(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('a, 'b, 'c)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSimpleSelectWithNaming(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
+      .select('a, 'b)
+
+    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testSimpleSelectRenameAll(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+      .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
+      .select('a, 'b)
+
+    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectInvalidFieldFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      // must fail. Field 'foo does not exist
+      .select('a, 'foo)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectAmbiguousRenaming(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      // must fail. 'a and 'b are both renamed to 'foo
+      .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectAmbiguousRenaming2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      // must fail. 'a and 'b are both renamed to 'a
+      .select('a, 'b as 'a).toDataSet[Row].print()
+  }
+
+  @Test
+  def testSelectStar(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*)
+
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
+      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
+      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = t.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAliasStarException(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+        .select('_1 as '*, '_2 as 'b, '_1 as 'c)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+  }
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(false) )
+
+    val expected = "\n"
+    val results = filterDs.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( Literal(true) )
+    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
+      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
+      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
+      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
+      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
+      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
+    val results = filterDs.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFilterOnStringTupleField(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val filterDs = ds.filter( 'c.like("%world%") )
+
+    val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
+    val results = filterDs.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFilterOnIntegerTupleField(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 === 0 )
+
+    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
+      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
+      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
+    val results = filterDs.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testNotEquals(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a % 2 !== 0)
+    val expected = "1,1,Hi\n" + "3,2,Hello world\n" +
+      "5,3,I am fine.\n" + "7,4,Comment#1\n" + "9,4,Comment#3\n" +
+      "11,5,Comment#5\n" + "13,5,Comment#7\n" + "15,5,Comment#9\n" +
+      "17,6,Comment#11\n" + "19,6,Comment#13\n" + "21,6,Comment#15\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testDisjunctivePredicate(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter( 'a < 2 || 'a > 20)
+    val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testConsecutiveFilters(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
+    val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
+      "9,4,Comment#3\n" + "17,6,Comment#11\n" +
+      "19,6,Comment#13\n" + "21,6,Comment#15\n"
+    val results = filterDs.collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFilterBasicType(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.getStringDataSet(env)
+
+    val filterDs = ds.toTable(tEnv, 'a).filter( 'a.like("H%") )
+
+    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
+    val results = filterDs.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testFilterOnCustomType(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
+      .filter( 's.like("%a%") )
+
+    val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
+    val results = filterDs.toDataSet[Row].collect()
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFilterInvalidFieldName(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env, config)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    // must fail. Field 'foo does not exist
+    ds.filter( 'foo === 2 )
+  }
+
+  @Test
   def testSimpleCalc(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -104,3 +401,13 @@ class CalcITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 }
+
+object CalcITCase {
+
+  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+  def parameters(): util.Collection[Array[java.lang.Object]] = {
+    Seq[Array[AnyRef]](
+      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
+      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/DistinctITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/DistinctITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/DistinctITCase.scala
deleted file mode 100644
index 55c7944..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/DistinctITCase.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{Row, TableEnvironment}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class DistinctITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) {
-
-  @Test
-  def testDistinct(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val distinct = ds.select('b).distinct()
-
-    val expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n"
-    val results = distinct.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testDistinctAfterAggregate(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-    val distinct = ds.groupBy('a, 'e).select('e).distinct()
-
-    val expected = "1\n" + "2\n" + "3\n"
-    val results = distinct.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala
deleted file mode 100644
index ee0356f..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/table/FilterITCase.scala
+++ /dev/null
@@ -1,188 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.expressions.Literal
-import org.apache.flink.api.table.{Row, TableEnvironment, ValidationException}
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-
-@RunWith(classOf[Parameterized])
-class FilterITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(false) )
-
-    val expected = "\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( Literal(true) )
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnStringTupleField(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val filterDs = ds.filter( 'c.like("%world%") )
-
-    val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnIntegerTupleField(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 === 0 )
-
-    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
-      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
-      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testNotEquals(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a % 2 !== 0)
-    val expected = "1,1,Hi\n" + "3,2,Hello world\n" +
-      "5,3,I am fine.\n" + "7,4,Comment#1\n" + "9,4,Comment#3\n" +
-      "11,5,Comment#5\n" + "13,5,Comment#7\n" + "15,5,Comment#9\n" +
-      "17,6,Comment#11\n" + "19,6,Comment#13\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testDisjunctivePredicate(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter( 'a < 2 || 'a > 20)
-    val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testConsecutiveFilters(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    val filterDs = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
-    val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
-      "9,4,Comment#3\n" + "17,6,Comment#11\n" +
-      "19,6,Comment#13\n" + "21,6,Comment#15\n"
-    val results = filterDs.collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterBasicType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.getStringDataSet(env)
-
-    val filterDs = ds.toTable(tEnv, 'a).filter( 'a.like("H%") )
-
-    val expected = "Hi\n" + "Hello\n" + "Hello world\n" + "Hello world, how are you?\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnCustomType(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.getCustomTypeDataSet(env)
-    val filterDs = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
-      .filter( 's.like("%a%") )
-
-    val expected = "3,3,Hello world, how are you?\n" + "3,4,I am fine.\n" + "3,5,Luke Skywalker\n"
-    val results = filterDs.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testFilterInvalidFieldName(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    // must fail. Field 'foo does not exist
-    ds.filter( 'foo === 2 )
-  }
-
-}


[3/3] flink git commit: [FLINK-3656] [table] Consolidate ITCases

Posted by tw...@apache.org.
[FLINK-3656] [table] Consolidate ITCases

Merge FilterIT/SelectIT to CalcITCases
Merge FromDataSet/ToTable to TableEnvironmentITCases
Merge aggregating ITCases
All batch ITCases use TableProgramsTestBase

This closes #2566.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7758571a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7758571a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7758571a

Branch: refs/heads/master
Commit: 7758571ae7a6a26859d91eb80e7b4df689e79c46
Parents: 8243138
Author: twalthr <tw...@apache.org>
Authored: Thu Sep 29 10:20:35 2016 +0200
Committer: twalthr <tw...@apache.org>
Committed: Thu Sep 29 15:43:37 2016 +0200

----------------------------------------------------------------------
 .../api/java/batch/TableEnvironmentITCase.java  | 473 +++++++++++++++++-
 .../java/batch/table/AggregationsITCase.java    | 204 +++++++-
 .../flink/api/java/batch/table/CalcITCase.java  | 290 +++++++++++
 .../api/java/batch/table/CastingITCase.java     |   2 -
 .../api/java/batch/table/DistinctITCase.java    |  76 ---
 .../api/java/batch/table/FilterITCase.java      | 170 -------
 .../api/java/batch/table/FromDataSetITCase.java | 499 -------------------
 .../batch/table/GroupedAggregationsITCase.java  | 124 -----
 .../flink/api/java/batch/table/JoinITCase.java  |  15 +-
 .../java/batch/table/PojoGroupingITCase.java    |  90 ----
 .../api/java/batch/table/SelectITCase.java      | 153 ------
 .../scala/batch/TableEnvironmentITCase.scala    | 129 +++++
 .../flink/api/scala/batch/TableSinkITCase.scala |  14 +-
 .../api/scala/batch/TableSourceITCase.scala     |  12 +-
 .../flink/api/scala/batch/sql/CalcITCase.scala  | 277 ++++++++++
 .../api/scala/batch/sql/FilterITCase.scala      | 158 ------
 .../api/scala/batch/sql/SelectITCase.scala      | 148 ------
 .../scala/batch/table/AggregationsITCase.scala  | 222 ++++++++-
 .../api/scala/batch/table/CalcITCase.scala      | 309 +++++++++++-
 .../api/scala/batch/table/DistinctITCase.scala  |  62 ---
 .../api/scala/batch/table/FilterITCase.scala    | 188 -------
 .../batch/table/GroupedAggregationsITCase.scala | 200 --------
 .../api/scala/batch/table/JoinITCase.scala      |  55 +-
 .../api/scala/batch/table/SelectITCase.scala    | 190 -------
 .../api/scala/batch/table/ToTableITCase.scala   | 158 ------
 .../batch/utils/TableProgramsTestBase.scala     |  24 +-
 .../api/scala/stream/table/CalcITCase.scala     | 285 +++++++++++
 .../api/scala/stream/table/FilterITCase.scala   | 143 ------
 .../api/scala/stream/table/SelectITCase.scala   | 175 -------
 29 files changed, 2214 insertions(+), 2631 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
index 8fdb2da..5e40724 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/TableEnvironmentITCase.java
@@ -18,20 +18,31 @@
 
 package org.apache.flink.api.java.batch;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.*;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.TableException;
+import org.apache.flink.api.table.ValidationException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.List;
-
 @RunWith(Parameterized.class)
 public class TableEnvironmentITCase extends TableProgramsTestBase {
 
@@ -39,6 +50,14 @@ public class TableEnvironmentITCase extends TableProgramsTestBase {
 		super(mode, configMode);
 	}
 
+	@Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+	public static Collection<Object[]> parameters() {
+		return Arrays.asList(new Object[][] {
+			{ TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT() },
+			{ TestExecutionMode.COLLECTION, TableProgramsTestBase.EFFICIENT() }
+		});
+	}
+
 	@Test
 	public void testSimpleRegister() throws Exception {
 		final String tableName = "MyTable";
@@ -145,4 +164,452 @@ public class TableEnvironmentITCase extends TableProgramsTestBase {
 		// Must fail. Table is bound to different TableEnvironment.
 		tableEnv2.registerTable("MyTable", t);
 	}
+
+	@Test
+	public void testAsFromTuple() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table table = tableEnv
+			.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
+			.select("a, b, c");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+			"20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromAndToTuple() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table table = tableEnv
+			.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
+			.select("a, b, c");
+
+		TypeInformation<?> ti = new TupleTypeInfo<Tuple3<Integer, Long, String>>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		DataSet<?> ds = tableEnv.toDataSet(table, ti);
+		List<?> results = ds.collect();
+		String expected = "(1,1,Hi)\n" + "(2,2,Hello)\n" + "(3,2,Hello world)\n" +
+			"(4,3,Hello world, how are you?)\n" + "(5,3,I am fine.)\n" + "(6,3,Luke Skywalker)\n" +
+			"(7,4,Comment#1)\n" + "(8,4,Comment#2)\n" + "(9,4,Comment#3)\n" + "(10,4,Comment#4)\n" +
+			"(11,5,Comment#5)\n" + "(12,5,Comment#6)\n" + "(13,5,Comment#7)\n" +
+			"(14,5,Comment#8)\n" + "(15,5,Comment#9)\n" + "(16,6,Comment#10)\n" +
+			"(17,6,Comment#11)\n" + "(18,6,Comment#12)\n" + "(19,6,Comment#13)\n" +
+			"(20,6,Comment#14)\n" + "(21,6,Comment#15)\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromTupleToPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<Tuple4<String, Integer, Double, String>> data = new ArrayList<>();
+		data.add(new Tuple4<>("Rofl", 1, 1.0, "Hi"));
+		data.add(new Tuple4<>("lol", 2, 1.0, "Hi"));
+		data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data), "a, b, c, d")
+			.select("a, b, c, d");
+
+		DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
+		List<SmallPojo2> results = ds.collect();
+		String expected = "Rofl,1,1.0,Hi\n" + "lol,2,1.0,Hi\n" + "Test me,4,3.33,Hello world\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<SmallPojo> data = new ArrayList<>();
+		data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
+		data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
+		data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"department AS a, " +
+				"age AS b, " +
+				"salary AS c, " +
+				"name AS d")
+			.select("a, b, c, d");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected =
+			"Sales,28,4000.0,Peter\n" +
+			"Engineering,56,10000.0,Anna\n" +
+			"HR,42,6000.0,Lucy\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromPrivateFieldsPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<PrivateSmallPojo> data = new ArrayList<>();
+		data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
+		data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
+		data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"department AS a, " +
+				"age AS b, " +
+				"salary AS c, " +
+				"name AS d")
+			.select("a, b, c, d");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected =
+			"Sales,28,4000.0,Peter\n" +
+			"Engineering,56,10000.0,Anna\n" +
+			"HR,42,6000.0,Lucy\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromAndToPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<SmallPojo> data = new ArrayList<>();
+		data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
+		data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
+		data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"department AS a, " +
+				"age AS b, " +
+				"salary AS c, " +
+				"name AS d")
+			.select("a, b, c, d");
+
+		DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
+		List<SmallPojo2> results = ds.collect();
+		String expected =
+			"Sales,28,4000.0,Peter\n" +
+			"Engineering,56,10000.0,Anna\n" +
+			"HR,42,6000.0,Lucy\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromAndToPrivateFieldPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<PrivateSmallPojo> data = new ArrayList<>();
+		data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
+		data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
+		data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"department AS a, " +
+				"age AS b, " +
+				"salary AS c, " +
+				"name AS d")
+			.select("a, b, c, d");
+
+		DataSet<PrivateSmallPojo2> ds = tableEnv.toDataSet(table, PrivateSmallPojo2.class);
+		List<PrivateSmallPojo2> results = ds.collect();
+		String expected =
+			"Sales,28,4000.0,Peter\n" +
+			"Engineering,56,10000.0,Anna\n" +
+			"HR,42,6000.0,Lucy\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsWithPojoAndGenericTypes() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<PojoWithGeneric> data = new ArrayList<>();
+		data.add(new PojoWithGeneric("Peter", 28, new HashMap<String, String>(), new ArrayList<String>()));
+		HashMap<String, String> hm1 = new HashMap<>();
+		hm1.put("test1", "test1");
+		data.add(new PojoWithGeneric("Anna", 56, hm1, new ArrayList<String>()));
+		HashMap<String, String> hm2 = new HashMap<>();
+		hm2.put("abc", "cde");
+		data.add(new PojoWithGeneric("Lucy", 42, hm2, new ArrayList<String>()));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"name AS a, " +
+				"age AS b, " +
+				"generic AS c, " +
+				"generic2 AS d")
+			.select("a, b, c, c as c2, d")
+			.select("a, b, c, c === c2, d");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected =
+			"Peter,28,{},true,[]\n" +
+			"Anna,56,{test1=test1},true,[]\n" +
+			"Lucy,42,{abc=cde},true,[]\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithToFewFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. Not enough field names specified.
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithToManyFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. Too many field names specified.
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithAmbiguousFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. Specified field names are not unique.
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithNonFieldReference1() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. as() does only allow field name expressions
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithNonFieldReference2() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. as() does only allow field name expressions
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b,  c");
+	}
+
+	@Test(expected = TableException.class)
+	public void testNonStaticClassInput() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail since class is not static
+		tableEnv.fromDataSet(env.fromElements(new MyNonStatic()), "name");
+	}
+
+	@Test(expected = TableException.class)
+	public void testNonStaticClassOutput() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail since class is not static
+		Table t = tableEnv.fromDataSet(env.fromElements(1, 2, 3), "number");
+		tableEnv.toDataSet(t, MyNonStatic.class);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public class MyNonStatic {
+		public int number;
+	}
+
+	@SuppressWarnings("unused")
+	public static class SmallPojo {
+
+		public SmallPojo() { }
+
+		public SmallPojo(String name, int age, double salary, String department) {
+			this.name = name;
+			this.age = age;
+			this.salary = salary;
+			this.department = department;
+		}
+
+		public String name;
+		public int age;
+		public double salary;
+		public String department;
+	}
+
+	@SuppressWarnings("unused")
+	public static class PojoWithGeneric {
+		public String name;
+		public int age;
+		public HashMap<String, String> generic;
+		public ArrayList<String> generic2;
+
+		public PojoWithGeneric() {
+			// default constructor
+		}
+
+		public PojoWithGeneric(String name, int age, HashMap<String, String> generic,
+				ArrayList<String> generic2) {
+			this.name = name;
+			this.age = age;
+			this.generic = generic;
+			this.generic2 = generic2;
+		}
+
+		@Override
+		public String toString() {
+			return name + "," + age + "," + generic + "," + generic2;
+		}
+	}
+
+	@SuppressWarnings("unused")
+	public static class PrivateSmallPojo {
+
+		public PrivateSmallPojo() { }
+
+		public PrivateSmallPojo(String name, int age, double salary, String department) {
+			this.name = name;
+			this.age = age;
+			this.salary = salary;
+			this.department = department;
+		}
+
+		private String name;
+		private int age;
+		private double salary;
+		private String department;
+
+		public String getName() {
+			return name;
+		}
+
+		public void setName(String name) {
+			this.name = name;
+		}
+
+		public int getAge() {
+			return age;
+		}
+
+		public void setAge(int age) {
+			this.age = age;
+		}
+
+		public double getSalary() {
+			return salary;
+		}
+
+		public void setSalary(double salary) {
+			this.salary = salary;
+		}
+
+		public String getDepartment() {
+			return department;
+		}
+
+		public void setDepartment(String department) {
+			this.department = department;
+		}
+	}
+
+	@SuppressWarnings("unused")
+	public static class SmallPojo2 {
+
+		public SmallPojo2() { }
+
+		public SmallPojo2(String a, int b, double c, String d) {
+			this.a = a;
+			this.b = b;
+			this.c = c;
+			this.d = d;
+		}
+
+		public String a;
+		public int b;
+		public double c;
+		public String d;
+
+		@Override
+		public String toString() {
+			return a + "," + b + "," + c + "," + d;
+		}
+	}
+
+	@SuppressWarnings("unused")
+	public static class PrivateSmallPojo2 {
+
+		public PrivateSmallPojo2() { }
+
+		public PrivateSmallPojo2(String a, int b, double c, String d) {
+			this.a = a;
+			this.b = b;
+			this.c = c;
+			this.d = d;
+		}
+
+		private String a;
+		private int b;
+		private double c;
+		private String d;
+
+		public String getA() {
+			return a;
+		}
+
+		public void setA(String a) {
+			this.a = a;
+		}
+
+		public int getB() {
+			return b;
+		}
+
+		public void setB(int b) {
+			this.b = b;
+		}
+
+		public double getC() {
+			return c;
+		}
+
+		public void setC(double c) {
+			this.c = c;
+		}
+
+		public String getD() {
+			return d;
+		}
+
+		public void setD(String d) {
+			this.d = d;
+		}
+
+		@Override
+		public String toString() {
+			return a + "," + b + "," + c + "," + d;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
index 6bcac56..02f6e0b 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/AggregationsITCase.java
@@ -17,37 +17,40 @@
  */
 package org.apache.flink.api.java.batch.table;
 
+import java.io.Serializable;
+import java.util.List;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.Order;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.TableEnvironment;
 import org.apache.flink.api.table.ValidationException;
+import org.apache.flink.examples.java.WordCountTable.WC;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
-import org.apache.flink.examples.java.WordCountTable.WC;
-
-import java.util.List;
 
 @RunWith(Parameterized.class)
-public class AggregationsITCase extends MultipleProgramsTestBase {
+public class AggregationsITCase extends TableProgramsTestBase {
 
-	public AggregationsITCase(TestExecutionMode mode){
-		super(mode);
+	public AggregationsITCase(TestExecutionMode mode, TableConfigMode configMode){
+		super(mode, configMode);
 	}
 
 	@Test
 	public void testAggregationTypes() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
 
 		Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
 
@@ -62,7 +65,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 	@Test(expected = ValidationException.class)
 	public void testAggregationOnNonExistingField() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
 
 		Table table =
 				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
@@ -79,7 +82,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 	@Test
 	public void testWorkingAggregationDataTypes() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
 
 		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
 				env.fromElements(
@@ -100,7 +103,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 	@Test
 	public void testAggregationWithArithmetic() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
 
 		DataSource<Tuple2<Float, String>> input =
 				env.fromElements(
@@ -122,7 +125,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 	@Test
 	public void testAggregationWithTwoCount() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
 
 		DataSource<Tuple2<Float, String>> input =
 			env.fromElements(
@@ -144,7 +147,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 	@Test(expected = ValidationException.class)
 	public void testNonWorkingDataTypes() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
 
 		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
 
@@ -164,7 +167,7 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 	@Test(expected = ValidationException.class)
 	public void testNoNestedAggregation() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
 
 		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
 
@@ -181,10 +184,90 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 		compareResultAsText(results, expected);
 	}
 
+	@Test(expected = ValidationException.class)
+	public void testGroupingOnNonExistentField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		tableEnv
+			.fromDataSet(input, "a, b, c")
+			// must fail. Field foo is not in input
+			.groupBy("foo")
+			.select("a.avg");
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testGroupingInvalidSelection() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		tableEnv
+			.fromDataSet(input, "a, b, c")
+			.groupBy("a, b")
+			// must fail. Field c is not a grouping key or aggregation
+			.select("c");
+	}
+
+	@Test
+	public void testGroupedAggregate() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.groupBy("b").select("b, a.sum");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testGroupingKeyForwardIfNotUsed() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.groupBy("b").select("a.sum");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testGroupNoAggregation() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+			.groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
+		List<Row> results = ds.collect();
+		compareResultAsText(results, expected);
+	}
+
 	@Test
 	public void testPojoAggregation() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
 		DataSet<WC> input = env.fromElements(
 				new WC("Hello", 1),
 				new WC("Ciao", 1),
@@ -208,5 +291,90 @@ public class AggregationsITCase extends MultipleProgramsTestBase {
 		String expected = "Hello\n" + "Hola";
 		compareResultAsText(result, expected);
 	}
+
+	@Test
+	public void testPojoGrouping() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Tuple3<String, Double, String>> data = env.fromElements(
+			new Tuple3<>("A", 23.0, "Z"),
+			new Tuple3<>("A", 24.0, "Y"),
+			new Tuple3<>("B", 1.0, "Z"));
+
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table table = tableEnv
+			.fromDataSet(data, "groupMe, value, name")
+			.select("groupMe, value, name")
+			.where("groupMe != 'B'");
+
+		DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);
+
+		DataSet<MyPojo> result = myPojos.groupBy("groupMe")
+			.sortGroup("value", Order.DESCENDING)
+			.first(1);
+
+		List<MyPojo> resultList = result.collect();
+		compareResultAsText(resultList, "A,24.0,Y");
+	}
+
+	@Test
+	public void testDistinct() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table distinct = table.select("b").distinct();
+
+		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testDistinctAfterAggregate() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = CollectionDataSets.get5TupleDataSet(env);
+
+		Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
+
+		Table distinct = table.groupBy("a, e").select("e").distinct();
+
+		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1\n" + "2\n" + "3\n";
+		compareResultAsText(results, expected);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public static class MyPojo implements Serializable {
+		private static final long serialVersionUID = 8741918940120107213L;
+
+		public String groupMe;
+		public double value;
+		public String name;
+
+		public MyPojo() {
+			// for serialization
+		}
+
+		public MyPojo(String groupMe, double value, String name) {
+			this.groupMe = groupMe;
+			this.value = value;
+			this.name = name;
+		}
+
+		@Override
+		public String toString() {
+			return groupMe + "," + value + "," + name;
+		}
+	}
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
new file mode 100644
index 0000000..fcdf2e1
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CalcITCase.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.batch.table;
+
+import java.util.Arrays;
+import java.util.Collection;
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Table;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.table.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.table.TableEnvironment;
+import org.apache.flink.api.table.ValidationException;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class CalcITCase extends TableProgramsTestBase {
+
+	public CalcITCase(TestExecutionMode mode, TableConfigMode configMode){
+		super(mode, configMode);
+	}
+
+	@Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
+	public static Collection<Object[]> parameters() {
+		return Arrays.asList(new Object[][] {
+			{ TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT() },
+			{ TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL() }
+		});
+	}
+
+	@Test
+	public void testSimpleSelectAllWithAs() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds, "a,b,c");
+
+		Table result = in
+				.select("a, b, c");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+			"20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testSimpleSelectWithNaming() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds);
+
+		Table result = in
+				.select("f0 as a, f1 as b")
+				.select("a, b");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testSimpleSelectRenameAll() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds);
+
+		Table result = in
+			.select("f0 as a, f1 as b, f2 as c")
+			.select("a, b");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+			"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+			"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testSelectInvalidField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		tableEnv.fromDataSet(ds, "a, b, c")
+			// Must fail. Field foo does not exist
+			.select("a + 1, foo + 2");
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testSelectAmbiguousFieldNames() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+
+		tableEnv.fromDataSet(ds, "a, b, c")
+			// Must fail. Field foo does not exist
+			.select("a + 1 as foo, b + 2 as foo");
+	}
+
+	@Test
+	public void testSelectStar() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds, "a,b,c");
+
+		Table result = in
+			.select("*");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+		                  "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+		                  "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+		                  "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+		                  "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+		                  "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+		                  "20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAllRejectingFilter() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.filter("false");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAllPassingFilter() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.filter("true");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+			"20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testFilterOnIntegerTupleField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.filter(" a % 2 = 0 ");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testNotEquals() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+				.filter("!( a % 2 <> 0 ) ");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
+				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
+				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testDisjunctivePreds() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+			.filter("a < 2 || a > 20");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,Hi\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testIntegerBiggerThan128() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		Table result = table
+			.filter("a = 300 ");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "300,1,Hello\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testFilterInvalidField() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
+		Table table = tableEnv.fromDataSet(input, "a, b, c");
+
+		table
+			// Must fail. Field foo does not exist.
+			.filter("foo = 17");
+	}
+
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java
index 9646076..333953b 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/CastingITCase.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
 import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.tuple.Tuple6;
@@ -32,7 +31,6 @@ import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
 import org.apache.flink.api.table.Row;
 import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.TableEnvironment;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/DistinctITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/DistinctITCase.java
deleted file mode 100644
index 7f10433..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/DistinctITCase.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import java.util.List;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class DistinctITCase extends MultipleProgramsTestBase {
-
-	public DistinctITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test
-	public void testDistinct() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table distinct = table.select("b").distinct();
-
-		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testDistinctAfterAggregate() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = CollectionDataSets.get5TupleDataSet(env);
-
-		Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
-
-		Table distinct = table.groupBy("a, e").select("e").distinct();
-
-		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1\n" + "2\n" + "3\n";
-		compareResultAsText(results, expected);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java
deleted file mode 100644
index 7a2bedf..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FilterITCase.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.ValidationException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class FilterITCase extends TableProgramsTestBase {
-
-	public FilterITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testAllRejectingFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter("false");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAllPassingFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter("true");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-			"20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testFilterOnIntegerTupleField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter(" a % 2 = 0 ");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testNotEquals() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter("!( a % 2 <> 0 ) ");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testDisjunctivePreds() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-			.filter("a < 2 || a > 20");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,Hi\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testIntegerBiggerThan128() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-			.filter("a = 300 ");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "300,1,Hello\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testFilterInvalidField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		table
-			// Must fail. Field foo does not exist.
-			.filter("foo = 17");
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
deleted file mode 100644
index e6b9226..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/FromDataSetITCase.java
+++ /dev/null
@@ -1,499 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import java.util.HashMap;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.TableException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.ArrayList;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class FromDataSetITCase extends TableProgramsTestBase {
-
-	public FromDataSetITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testAsFromTuple() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		Table table = tableEnv
-			.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
-			.select("a, b, c");
-
-		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-			"20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsFromAndToTuple() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		Table table = tableEnv
-			.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
-			.select("a, b, c");
-
-		TypeInformation<?> ti = new TupleTypeInfo<Tuple3<Integer, Long, String>>(
-			BasicTypeInfo.INT_TYPE_INFO,
-			BasicTypeInfo.LONG_TYPE_INFO,
-			BasicTypeInfo.STRING_TYPE_INFO);
-
-		DataSet<?> ds = tableEnv.toDataSet(table, ti);
-		List<?> results = ds.collect();
-		String expected = "(1,1,Hi)\n" + "(2,2,Hello)\n" + "(3,2,Hello world)\n" +
-			"(4,3,Hello world, how are you?)\n" + "(5,3,I am fine.)\n" + "(6,3,Luke Skywalker)\n" +
-			"(7,4,Comment#1)\n" + "(8,4,Comment#2)\n" + "(9,4,Comment#3)\n" + "(10,4,Comment#4)\n" +
-			"(11,5,Comment#5)\n" + "(12,5,Comment#6)\n" + "(13,5,Comment#7)\n" +
-			"(14,5,Comment#8)\n" + "(15,5,Comment#9)\n" + "(16,6,Comment#10)\n" +
-			"(17,6,Comment#11)\n" + "(18,6,Comment#12)\n" + "(19,6,Comment#13)\n" +
-			"(20,6,Comment#14)\n" + "(21,6,Comment#15)\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsFromTupleToPojo() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		List<Tuple4<String, Integer, Double, String>> data = new ArrayList<>();
-		data.add(new Tuple4<>("Rofl", 1, 1.0, "Hi"));
-		data.add(new Tuple4<>("lol", 2, 1.0, "Hi"));
-		data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));
-
-		Table table = tableEnv
-			.fromDataSet(env.fromCollection(data), "a, b, c, d")
-			.select("a, b, c, d");
-
-		DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
-		List<SmallPojo2> results = ds.collect();
-		String expected = "Rofl,1,1.0,Hi\n" + "lol,2,1.0,Hi\n" + "Test me,4,3.33,Hello world\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsFromPojo() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		List<SmallPojo> data = new ArrayList<>();
-		data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
-		data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
-		data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
-
-		Table table = tableEnv
-			.fromDataSet(env.fromCollection(data),
-				"department AS a, " +
-				"age AS b, " +
-				"salary AS c, " +
-				"name AS d")
-			.select("a, b, c, d");
-
-		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		List<Row> results = ds.collect();
-		String expected =
-			"Sales,28,4000.0,Peter\n" +
-			"Engineering,56,10000.0,Anna\n" +
-			"HR,42,6000.0,Lucy\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsFromPrivateFieldsPojo() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		List<PrivateSmallPojo> data = new ArrayList<>();
-		data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
-		data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
-		data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
-
-		Table table = tableEnv
-			.fromDataSet(env.fromCollection(data),
-				"department AS a, " +
-				"age AS b, " +
-				"salary AS c, " +
-				"name AS d")
-			.select("a, b, c, d");
-
-		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		List<Row> results = ds.collect();
-		String expected =
-			"Sales,28,4000.0,Peter\n" +
-			"Engineering,56,10000.0,Anna\n" +
-			"HR,42,6000.0,Lucy\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsFromAndToPojo() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		List<SmallPojo> data = new ArrayList<>();
-		data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
-		data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
-		data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
-
-		Table table = tableEnv
-			.fromDataSet(env.fromCollection(data),
-				"department AS a, " +
-				"age AS b, " +
-				"salary AS c, " +
-				"name AS d")
-			.select("a, b, c, d");
-
-		DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
-		List<SmallPojo2> results = ds.collect();
-		String expected =
-			"Sales,28,4000.0,Peter\n" +
-			"Engineering,56,10000.0,Anna\n" +
-			"HR,42,6000.0,Lucy\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsFromAndToPrivateFieldPojo() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		List<PrivateSmallPojo> data = new ArrayList<>();
-		data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
-		data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
-		data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
-
-		Table table = tableEnv
-			.fromDataSet(env.fromCollection(data),
-				"department AS a, " +
-				"age AS b, " +
-				"salary AS c, " +
-				"name AS d")
-			.select("a, b, c, d");
-
-		DataSet<PrivateSmallPojo2> ds = tableEnv.toDataSet(table, PrivateSmallPojo2.class);
-		List<PrivateSmallPojo2> results = ds.collect();
-		String expected =
-			"Sales,28,4000.0,Peter\n" +
-			"Engineering,56,10000.0,Anna\n" +
-			"HR,42,6000.0,Lucy\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAsWithPojoAndGenericTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		List<PojoWithGeneric> data = new ArrayList<>();
-		data.add(new PojoWithGeneric("Peter", 28, new HashMap<String, String>(), new ArrayList<String>()));
-		HashMap<String, String> hm1 = new HashMap<>();
-		hm1.put("test1", "test1");
-		data.add(new PojoWithGeneric("Anna", 56, hm1, new ArrayList<String>()));
-		HashMap<String, String> hm2 = new HashMap<>();
-		hm2.put("abc", "cde");
-		data.add(new PojoWithGeneric("Lucy", 42, hm2, new ArrayList<String>()));
-
-		Table table = tableEnv
-			.fromDataSet(env.fromCollection(data),
-				"name AS a, " +
-				"age AS b, " +
-				"generic AS c, " +
-				"generic2 AS d")
-			.select("a, b, c, c as c2, d")
-			.select("a, b, c, c === c2, d");
-
-		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
-		List<Row> results = ds.collect();
-		String expected =
-			"Peter,28,{},true,[]\n" +
-			"Anna,56,{test1=test1},true,[]\n" +
-			"Lucy,42,{abc=cde},true,[]\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = TableException.class)
-	public void testAsWithToFewFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail. Not enough field names specified.
-		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b");
-	}
-
-	@Test(expected = TableException.class)
-	public void testAsWithToManyFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail. Too many field names specified.
-		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
-	}
-
-	@Test(expected = TableException.class)
-	public void testAsWithAmbiguousFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail. Specified field names are not unique.
-		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
-	}
-
-	@Test(expected = TableException.class)
-	public void testAsWithNonFieldReference1() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail. as() does only allow field name expressions
-		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
-	}
-
-	@Test(expected = TableException.class)
-	public void testAsWithNonFieldReference2() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail. as() does only allow field name expressions
-		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b,  c");
-	}
-
-	@Test(expected = TableException.class)
-	public void testNonStaticClassInput() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail since class is not static
-		tableEnv.fromDataSet(env.fromElements(new MyNonStatic()), "name");
-	}
-
-	@Test(expected = TableException.class)
-	public void testNonStaticClassOutput() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		// Must fail since class is not static
-		Table t = tableEnv.fromDataSet(env.fromElements(1, 2, 3), "number");
-		tableEnv.toDataSet(t, MyNonStatic.class);
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public class MyNonStatic {
-		public int number;
-	}
-
-	@SuppressWarnings("unused")
-	public static class SmallPojo {
-
-		public SmallPojo() { }
-
-		public SmallPojo(String name, int age, double salary, String department) {
-			this.name = name;
-			this.age = age;
-			this.salary = salary;
-			this.department = department;
-		}
-
-		public String name;
-		public int age;
-		public double salary;
-		public String department;
-	}
-
-	@SuppressWarnings("unused")
-	public static class PojoWithGeneric {
-		public String name;
-		public int age;
-		public HashMap<String, String> generic;
-		public ArrayList<String> generic2;
-
-		public PojoWithGeneric() {
-			// default constructor
-		}
-
-		public PojoWithGeneric(String name, int age, HashMap<String, String> generic,
-				ArrayList<String> generic2) {
-			this.name = name;
-			this.age = age;
-			this.generic = generic;
-			this.generic2 = generic2;
-		}
-
-		@Override
-		public String toString() {
-			return name + "," + age + "," + generic + "," + generic2;
-		}
-	}
-
-	@SuppressWarnings("unused")
-	public static class PrivateSmallPojo {
-
-		public PrivateSmallPojo() { }
-
-		public PrivateSmallPojo(String name, int age, double salary, String department) {
-			this.name = name;
-			this.age = age;
-			this.salary = salary;
-			this.department = department;
-		}
-
-		private String name;
-		private int age;
-		private double salary;
-		private String department;
-
-		public String getName() {
-			return name;
-		}
-
-		public void setName(String name) {
-			this.name = name;
-		}
-
-		public int getAge() {
-			return age;
-		}
-
-		public void setAge(int age) {
-			this.age = age;
-		}
-
-		public double getSalary() {
-			return salary;
-		}
-
-		public void setSalary(double salary) {
-			this.salary = salary;
-		}
-
-		public String getDepartment() {
-			return department;
-		}
-
-		public void setDepartment(String department) {
-			this.department = department;
-		}
-	}
-
-	@SuppressWarnings("unused")
-	public static class SmallPojo2 {
-
-		public SmallPojo2() { }
-
-		public SmallPojo2(String a, int b, double c, String d) {
-			this.a = a;
-			this.b = b;
-			this.c = c;
-			this.d = d;
-		}
-
-		public String a;
-		public int b;
-		public double c;
-		public String d;
-
-		@Override
-		public String toString() {
-			return a + "," + b + "," + c + "," + d;
-		}
-	}
-
-	@SuppressWarnings("unused")
-	public static class PrivateSmallPojo2 {
-
-		public PrivateSmallPojo2() { }
-
-		public PrivateSmallPojo2(String a, int b, double c, String d) {
-			this.a = a;
-			this.b = b;
-			this.c = c;
-			this.d = d;
-		}
-
-		private String a;
-		private int b;
-		private double c;
-		private String d;
-
-		public String getA() {
-			return a;
-		}
-
-		public void setA(String a) {
-			this.a = a;
-		}
-
-		public int getB() {
-			return b;
-		}
-
-		public void setB(int b) {
-			this.b = b;
-		}
-
-		public double getC() {
-			return c;
-		}
-
-		public void setC(double c) {
-			this.c = c;
-		}
-
-		public String getD() {
-			return d;
-		}
-
-		public void setD(String d) {
-			this.d = d;
-		}
-
-		@Override
-		public String toString() {
-			return a + "," + b + "," + c + "," + d;
-		}
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java
deleted file mode 100644
index 1906040..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/GroupedAggregationsITCase.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.api.table.ValidationException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class GroupedAggregationsITCase extends MultipleProgramsTestBase {
-
-	public GroupedAggregationsITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testGroupingOnNonExistentField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv
-			.fromDataSet(input, "a, b, c")
-			// must fail. Field foo is not in input
-			.groupBy("foo")
-			.select("a.avg");
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testGroupingInvalidSelection() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv
-			.fromDataSet(input, "a, b, c")
-			.groupBy("a, b")
-			// must fail. Field c is not a grouping key or aggregation
-			.select("c");
-	}
-
-	@Test
-	public void testGroupedAggregate() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.groupBy("b").select("b, a.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testGroupingKeyForwardIfNotUsed() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.groupBy("b").select("a.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testGroupNoAggregation() throws Exception {
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-			.groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
-		List<Row> results = ds.collect();
-		compareResultAsText(results, expected);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
index e6db3b0..9676608 100644
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/JoinITCase.java
@@ -18,29 +18,28 @@
 
 package org.apache.flink.api.java.batch.table;
 
-import org.apache.flink.api.table.Row;
-import org.apache.flink.api.table.Table;
+import java.util.List;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.table.BatchTableEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.api.table.Row;
+import org.apache.flink.api.table.Table;
 import org.apache.flink.api.table.TableEnvironment;
 import org.apache.flink.api.table.ValidationException;
 import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.util.List;
-
 
 @RunWith(Parameterized.class)
-public class JoinITCase extends MultipleProgramsTestBase {
+public class JoinITCase extends TableProgramsTestBase {
 
-	public JoinITCase(TestExecutionMode mode) {
-		super(mode);
+	public JoinITCase(TestExecutionMode mode, TableConfigMode configMode){
+		super(mode, configMode);
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/7758571a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java
deleted file mode 100644
index ba564bf..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/api/java/batch/table/PojoGroupingITCase.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.batch.table;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.table.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.table.Table;
-import org.apache.flink.api.table.TableEnvironment;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class PojoGroupingITCase extends MultipleProgramsTestBase {
-
-	public PojoGroupingITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testPojoGrouping() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<String, Double, String>> data = env.fromElements(
-			new Tuple3<>("A", 23.0, "Z"),
-			new Tuple3<>("A", 24.0, "Y"),
-			new Tuple3<>("B", 1.0, "Z"));
-
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		Table table = tableEnv
-			.fromDataSet(data, "groupMe, value, name")
-			.select("groupMe, value, name")
-			.where("groupMe != 'B'");
-
-		DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);
-
-		DataSet<MyPojo> result = myPojos.groupBy("groupMe")
-			.sortGroup("value", Order.DESCENDING)
-			.first(1);
-
-		List<MyPojo> resultList = result.collect();
-		compareResultAsText(resultList, "A,24.0,Y");
-	}
-
-	public static class MyPojo implements Serializable {
-		private static final long serialVersionUID = 8741918940120107213L;
-
-		public String groupMe;
-		public double value;
-		public String name;
-
-		public MyPojo() {
-			// for serialization
-		}
-
-		public MyPojo(String groupMe, double value, String name) {
-			this.groupMe = groupMe;
-			this.value = value;
-			this.name = name;
-		}
-
-		@Override
-		public String toString() {
-			return groupMe + "," + value + "," + name;
-		}
-	}
-}