You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/01/10 11:30:26 UTC

[1/2] flink git commit: [FLINK-5084] [table] Replace Java Table API integration tests by unit tests

Repository: flink
Updated Branches:
  refs/heads/master 614acc3e7 -> 649cf054e


http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
new file mode 100644
index 0000000..ace53d9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/AggregationsStringExpressionTest.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
+import org.junit._
+
+class AggregationsStringExpressionTest {
+
+  @Test
+  def testAggregationTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+
+    val t1 = t.select('_1.sum, '_1.min, '_1.max, '_1.count, '_1.avg)
+    val t2 = t.select("_1.sum, _1.min, _1.max, _1.count, _1.avg")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testWorkingAggregationDataTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
+
+    val t1 = t.select('_1.avg, '_2.avg, '_3.avg, '_4.avg, '_5.avg, '_6.avg, '_7.count)
+    val t2 = t.select("_1.avg, _2.avg, _3.avg, _4.avg, _5.avg, _6.avg, _7.count")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testProjection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements(
+      (1: Byte, 1: Short),
+      (2: Byte, 2: Short)).toTable(tEnv)
+
+    val t1 = t.select('_1.avg, '_1.sum, '_1.count, '_2.avg, '_2.sum)
+    val t2 = t.select("_1.avg, _1.sum, _1.count, _2.avg, _2.sum")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testAggregationWithArithmetic(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
+
+    val t1 = t.select(('_1 + 2).avg + 2, '_2.count + 5)
+    val t2 = t.select("(_1 + 2).avg + 2, _2.count + 5")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testAggregationWithTwoCount(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
+
+    val t1 = t.select('_1.count, '_2.count)
+    val t2 = t.select("_1.count, _2.count")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testAggregationAfterProjection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env.fromElements(
+      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
+      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
+
+    val t1 = t.select('_1, '_2, '_3)
+      .select('_1.avg, '_2.sum, '_3.count)
+
+    val t2 = t.select("_1, _2, _3")
+      .select("_1.avg, _2.sum, _3.count")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testDistinct(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val distinct = ds.select('b).distinct()
+    val distinct2 = ds.select("b").distinct()
+
+    val lPlan1 = distinct.logicalPlan
+    val lPlan2 = distinct2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+
+    val distinct = ds.groupBy('a, 'e).select('e).distinct()
+    val distinct2 = ds.groupBy("a, e").select("e").distinct()
+
+    val lPlan1 = distinct.logicalPlan
+    val lPlan2 = distinct2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testGroupedAggregate(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.groupBy('b).select('b, 'a.sum)
+    val t2 = t.groupBy("b").select("b, a.sum")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testGroupingKeyForwardIfNotUsed(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.groupBy('b).select('a.sum)
+    val t2 = t.groupBy("b").select("a.sum")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testGroupNoAggregation(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t
+      .groupBy('b)
+      .select('a.sum as 'd, 'b)
+      .groupBy('b, 'd)
+      .select('b)
+
+    val t2 = t
+      .groupBy("b")
+      .select("a.sum as d, b")
+      .groupBy("b, d")
+      .select("b")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testGroupedAggregateWithConstant1(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.select('a, 4 as 'four, 'b)
+      .groupBy('four, 'a)
+      .select('four, 'b.sum)
+
+    val t2 = t.select("a, 4 as four, b")
+      .groupBy("four, a")
+      .select("four, b.sum")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testGroupedAggregateWithConstant2(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.select('b, 4 as 'four, 'a)
+      .groupBy('b, 'four)
+      .select('four, 'a.sum)
+    val t2 = t.select("b, 4 as four, a")
+      .groupBy("b, four")
+      .select("four, a.sum")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testGroupedAggregateWithExpression(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+
+    val t1 = t.groupBy('e, 'b % 3)
+      .select('c.min, 'e, 'a.avg, 'd.count)
+    val t2 = t.groupBy("e, b % 3")
+      .select("c.min, e, a.avg, d.count")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testGroupedAggregateWithFilter(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.groupBy('b)
+      .select('b, 'a.sum)
+      .where('b === 2)
+    val t2 = t.groupBy("b")
+      .select("b, a.sum")
+      .where("b = 2")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
new file mode 100644
index 0000000..a5a5241
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CalcStringExpressionTest.scala
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.stringexpr
+
+import java.sql.{Date, Time, Timestamp}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
+import org.apache.flink.table.api.{TableEnvironment, Types}
+import org.apache.flink.table.expressions.Literal
+import org.junit._
+
+class CalcStringExpressionTest {
+
+  @Test
+  def testSimpleSelectAllWithAs(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.select('a, 'b, 'c)
+    val t2 = t.select("a, b, c")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testSimpleSelectWithNaming(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+
+    val t1 = t
+      .select('_1 as 'a, '_2 as 'b, '_1 as 'c)
+      .select('a, 'b)
+
+    val t2 = t
+      .select("_1 as a, _2 as b, _1 as c")
+      .select("a, b")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testSimpleSelectRenameAll(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+
+    val t1 = t
+      .select('_1 as 'a, '_2 as 'b, '_3 as 'c)
+      .select('a, 'b)
+
+    val t2 = t
+      .select("_1 as a, _2 as b, _3 as c")
+      .select("a, b")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testSelectStar(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = t.select('*)
+    val t2 = t.select("*")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testAllRejectingFilter(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter( Literal(false) )
+    val t2 = ds.filter("false")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testAllPassingFilter(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter( Literal(true) )
+    val t2 = ds.filter("true")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testFilterOnStringTupleField(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter( 'c.like("%world%") )
+    val t2 = ds.filter("c.like('%world%')")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testFilterOnIntegerTupleField(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter( 'a % 2 === 0 )
+    val t2 = ds.filter( "a % 2 = 0 ")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testNotEquals(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter( 'a % 2 !== 0 )
+    val t2 = ds.filter("a % 2 <> 0")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testDisjunctivePredicate(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter( 'a < 2 || 'a > 20)
+    val t2 = ds.filter("a < 2 || a > 20")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testConsecutiveFilters(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    val t1 = ds.filter('a % 2 !== 0).filter('b % 2 === 0)
+    val t2 = ds.filter("a % 2 != 0").filter("b % 2 = 0")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testFilterBasicType(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.getStringDataSet(env).toTable(tEnv, 'a)
+
+    val t1 = ds.filter( 'a.like("H%") )
+    val t2 = ds.filter( "a.like('H%')" )
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testFilterOnCustomType(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.getCustomTypeDataSet(env)
+    val t = ds.toTable(tEnv, 'myInt as 'i, 'myLong as 'l, 'myString as 's)
+
+    val t1 = t.filter( 's.like("%a%") )
+    val t2 = t.filter("s.like('%a%')")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testSimpleCalc(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+
+    val t1 = t.select('_1, '_2, '_3)
+      .where('_1 < 7)
+      .select('_1, '_3)
+
+    val t2 = t.select("_1, _2, _3")
+      .where("_1 < 7")
+      .select("_1, _3")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testCalcWithTwoFilters(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+
+    val t1 = t.select('_1, '_2, '_3)
+      .where('_1 < 7 && '_2 === 3)
+      .select('_1, '_3)
+      .where('_1 === 4)
+      .select('_1)
+
+    val t2 = t.select("_1, _2, _3")
+      .where("_1 < 7 && _2 = 3")
+      .select("_1, _3")
+      .where("_1 === 4")
+      .select("_1")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testCalcWithAggregation(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+
+    val t1 = t.select('_1, '_2, '_3)
+      .where('_1 < 15)
+      .groupBy('_2)
+      .select('_1.min, '_2.count as 'cnt)
+      .where('cnt > 3)
+
+
+    val t2 = t.select("_1, _2, _3")
+      .where("_1 < 15")
+      .groupBy("_2")
+      .select("_1.min, _2.count as cnt")
+      .where("cnt > 3")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testCalcJoin(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.select('a, 'b).join(ds2).where('b === 'e).select('a, 'b, 'd, 'e, 'f)
+      .where('b > 1).select('a, 'd).where('d === 2)
+    val t2 = ds1.select("a, b").join(ds2).where("b = e").select("a, b, d, e, f")
+      .where("b > 1").select("a, d").where("d = 2")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testAdvancedDataTypes(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = env
+      .fromElements((
+        BigDecimal("78.454654654654654").bigDecimal,
+        BigDecimal("4E+9999").bigDecimal,
+        Date.valueOf("1984-07-12"),
+        Time.valueOf("14:34:24"),
+        Timestamp.valueOf("1984-07-12 14:34:24")))
+      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+
+    val t1 = t.select('a, 'b, 'c, 'd, 'e, BigDecimal("11.2"), BigDecimal("11.2").bigDecimal,
+        "1984-07-12".cast(Types.DATE), "14:34:24".cast(Types.TIME),
+        "1984-07-12 14:34:24".cast(Types.TIMESTAMP))
+    val t2 = t.select("a, b, c, d, e, 11.2, 11.2," +
+      "'1984-07-12'.toDate, '14:34:24'.toTime," +
+      "'1984-07-12 14:34:24'.toTimestamp")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1.toString, lPlan2.toString)
+  }
+
+  @Test
+  def testIntegerBiggerThan128(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val t = env.fromElements((300, 1L, "Hello")).toTable(tableEnv, 'a, 'b, 'c)
+
+    val t1 = t.filter('a === 300)
+    val t2 = t.filter("a = 300")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
new file mode 100644
index 0000000..19d27fe
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/CastingStringExpressionTest.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.Types._
+import org.apache.flink.table.api.scala._
+import org.junit._
+
+class CastingStringExpressionTest {
+
+  @Test
+  def testNumericAutocastInArithmetic() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements(
+      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tableEnv)
+    val t1 = table.select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f,
+      '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
+    val t2 = table.select("_1 + 1, _2 +" +
+      " 1, _3 + 1L, _4 + 1.0f, _5 + 1.0d, _6 + 1, _7 + 1.0d, _8 + _1")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  @throws[Exception]
+  def testNumericAutocastInComparison() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements(
+      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d),
+      (2.toByte, 2.toShort, 2, 2L, 2.0f, 2.0d))
+      .toTable(tableEnv, 'a, 'b, 'c, 'd, 'e, 'f)
+    val t1 = table.filter('a > 1 && 'b > 1 && 'c > 1L &&
+      'd > 1.0f && 'e > 1.0d && 'f > 1)
+    val t2 = table
+      .filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  @throws[Exception]
+  def testCasting() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements((1, 0.0, 1L, true)).toTable(tableEnv)
+    val t1 = table .select(
+      // * -> String
+      '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING),
+      // NUMERIC TYPE -> Boolean
+      '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN),
+      // NUMERIC TYPE -> NUMERIC TYPE
+      '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT),
+      // Boolean -> NUMERIC TYPE
+      '_4.cast(DOUBLE), // identity casting
+      '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN))
+    val t2 = table.select(
+      // * -> String
+      "_1.cast(STRING), _2.cast(STRING), _3.cast(STRING), _4.cast(STRING)," +
+        // NUMERIC TYPE -> Boolean
+        "_1.cast(BOOL), _2.cast(BOOL), _3.cast(BOOL)," +
+        // NUMERIC TYPE -> NUMERIC TYPE
+        "_1.cast(DOUBLE), _2.cast(INT), _3.cast(SHORT)," +
+        // Boolean -> NUMERIC TYPE
+        "_4.cast(DOUBLE)," +
+        // identity casting
+        "_1.cast(INT), _2.cast(DOUBLE), _3.cast(LONG), _4.cast(BOOL)")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  @throws[Exception]
+  def testCastFromString() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements(("1", "true", "2.0")).toTable(tableEnv)
+    val t1 = table .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG),
+        '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN))
+    val t2 = table.select(
+      "_1.cast(BYTE), _1.cast(SHORT), _1.cast(INT), _1.cast(LONG), " +
+        "_3.cast(DOUBLE), _3.cast(FLOAT), _2.cast(BOOL)")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
new file mode 100644
index 0000000..025cda9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/stringexpr/JoinStringExpressionTest.scala
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.stringexpr
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.scala.batch.utils.LogicalPlanFormatUtils
+import org.apache.flink.table.expressions.Literal
+import org.junit._
+
+class JoinStringExpressionTest {
+
+  @Test
+  def testJoin(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).where('b === 'e).select('c, 'g)
+    val t1Java = ds1.join(ds2).where("b === e").select("c, g")
+
+    val lPlan1 = t1Scala.logicalPlan
+    val lPlan2 = t1Java.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testJoinWithFilter(): Unit = {
+
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('d, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).where('b === 'e && 'b < 2).select('c, 'g)
+    val t1Java = ds1.join(ds2).where("b === e && b < 2").select("c, g")
+
+    val lPlan1 = t1Scala.logicalPlan
+    val lPlan2 = t1Java.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testJoinWithJoinFilter(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).where('b === 'e && 'a < 6 && 'h < 'b).select('c, 'g)
+    val t1Java = ds1.join(ds2).where("b === e && a < 6 && h < b").select("c, g")
+
+    val lPlan1 = t1Scala.logicalPlan
+    val lPlan2 = t1Java.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testJoinWithMultipleKeys(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).filter('a === 'd && 'b === 'h).select('c, 'g)
+    val t1Java = ds1.join(ds2).filter("a === d && b === h").select("c, g")
+
+    val lPlan1 = t1Scala.logicalPlan
+    val lPlan2 = t1Java.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testJoinWithAggregation(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1Scala = ds1.join(ds2).where('a === 'd).select('g.count)
+    val t1Java = ds1.join(ds2).where("a === d").select("g.count")
+
+    val lPlan1 = t1Scala.logicalPlan
+    val lPlan2 = t1Java.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testJoinWithGroupedAggregation(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.join(ds2)
+      .where('a === 'd)
+      .groupBy('a, 'd)
+      .select('b.sum, 'g.count)
+    val t2 = ds1.join(ds2)
+      .where("a = d")
+      .groupBy("a, d")
+      .select("b.sum, g.count")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match",
+      LogicalPlanFormatUtils.formatTempTableId(lPlan1.toString),
+      LogicalPlanFormatUtils.formatTempTableId(lPlan2.toString))
+  }
+
+  @Test
+  def testJoinPushThroughJoin(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+    val ds3 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'j, 'k, 'l)
+
+    val t1 = ds1.join(ds2)
+      .where(Literal(true))
+      .join(ds3)
+      .where('a === 'd && 'e === 'k)
+      .select('a, 'f, 'l)
+    val t2 = ds1.join(ds2)
+      .where("true")
+      .join(ds3)
+      .where("a === d && e === k")
+      .select("a, f, l")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testJoinWithDisjunctivePred(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.join(ds2).filter('a === 'd && ('b === 'e || 'b === 'e - 10)).select('c, 'g)
+    val t2 = ds1.join(ds2).filter("a = d && (b = e || b = e - 10)").select("c, g")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testJoinWithExpressionPreds(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.join(ds2).filter('b === 'h + 1 && 'a - 1 === 'd + 2).select('c, 'g)
+    val t2 = ds1.join(ds2).filter("b = h + 1 && a - 1 = d + 2").select("c, g")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testLeftJoinWithMultipleKeys(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.leftOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
+    val t2 = ds1.leftOuterJoin(ds2, "a = d && b = h").select("c, g")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testRightJoinWithMultipleKeys(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
+    val t2 = ds1.rightOuterJoin(ds2, "a = d && b = h").select("c, g")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testRightJoinWithNotOnlyEquiJoin(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
+    val t2 = ds1.rightOuterJoin(ds2, "a = d && b < h").select("c, g")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+  @Test
+  def testFullOuterJoinWithMultipleKeys(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    val t1 = ds1.fullOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
+    val t2 = ds1.fullOuterJoin(ds2, "a = d && b = h").select("c, g")
+
+    val lPlan1 = t1.logicalPlan
+    val lPlan2 = t2.logicalPlan
+
+    Assert.assertEquals("Logical Plans do not match", lPlan1, lPlan2)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
new file mode 100644
index 0000000..5cf3f84
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.junit._
+
+class AggregationsValidationTest {
+
+  @Test(expected = classOf[ValidationException])
+  def testNonWorkingAggregationDataTypes(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    env.fromElements(("Hello", 1)).toTable(tEnv)
+      // Must fail. Field '_1 is not a numeric type.
+      .select('_1.sum)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNoNestedAggregations(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    env.fromElements(("Hello", 1)).toTable(tEnv)
+      // Must fail. Sum aggregation can not be chained.
+      .select('_2.sum.sum)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupingOnNonExistentField(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      // must fail. '_foo not a valid field
+      .groupBy('_foo)
+      .select('a.avg)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testGroupingInvalidSelection(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      .groupBy('a, 'b)
+      // must fail. 'c is not a grouping key or aggregation
+      .select('c)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testAggregationOnNonExistingField(): Unit = {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+      // Must fail. Field 'foo does not exist.
+      .select('foo.avg)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testAggregationOnNonExistingFieldJava() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv)
+    table.select("foo.avg")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testNonWorkingAggregationDataTypesJava() {
+    val env= ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements((1f, "Hello")).toTable(tableEnv)
+    // Must fail. Cannot compute SUM aggregate on String field.
+    table.select("f1.sum")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testNoNestedAggregationsJava() {
+    val env= ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements((1f, "Hello")).toTable(tableEnv)
+    // Must fail. Aggregation on aggregation not allowed.
+    table.select("f0.sum.sum")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testGroupingOnNonExistentFieldJava() {
+    val env= ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+    input
+      // must fail. Field foo is not in input
+      .groupBy("foo")
+      .select("a.avg")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testGroupingInvalidSelectionJava() {
+    val env= ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val input = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+    input
+      .groupBy("a, b")
+      // must fail. Field c is not a grouping key or aggregation
+      .select("c")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
new file mode 100644
index 0000000..846585b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/CalcValidationTest.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.api.scala._
+import org.apache.flink.types.Row
+import org.junit.Assert._
+import org.junit._
+
+class CalcValidationTest {
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectInvalidFieldFields(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      // must fail. Field 'foo does not exist
+      .select('a, 'foo)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectAmbiguousRenaming(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      // must fail. 'a and 'b are both renamed to 'foo
+      .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testSelectAmbiguousRenaming2(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+      // must fail. 'a and 'b are both renamed to 'a
+      .select('a, 'b as 'a).toDataSet[Row].print()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFilterInvalidFieldName(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+
+    // must fail. Field 'foo does not exist
+    ds.filter( 'foo === 2 )
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testSelectInvalidField() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+
+    // Must fail. Field foo does not exist
+    ds.select("a + 1, foo + 2")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testSelectAmbiguousFieldNames() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+
+    // Must fail. Field foo does not exist
+    ds.select("a + 1 as foo, b + 2 as foo")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testFilterInvalidField() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = CollectionDataSets.get3TupleDataSet(env).toTable(tableEnv, 'a, 'b, 'c)
+
+    // Must fail. Field foo does not exist.
+    table.filter("foo = 17")
+  }
+
+  @Test
+  def testAliasStarException(): Unit = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
+      fail("TableException expected")
+    } catch {
+      case _: TableException => //ignore
+    }
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
+        .select('_1 as '*, '_2 as 'b, '_1 as 'c)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+
+    try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+try {
+      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b)
+      fail("ValidationException expected")
+    } catch {
+      case _: ValidationException => //ignore
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala
new file mode 100644
index 0000000..4837fcc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/JoinValidationTest.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.junit._
+
+class JoinValidationTest {
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinNonExistingKey(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.join(ds2)
+      // must fail. Field 'foo does not exist
+      .where('foo === 'e)
+      .select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinWithNonMatchingKeyTypes(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.join(ds2)
+      // must fail. Field 'a is Int, and 'g is String
+      .where('a === 'g)
+      .select('c, 'g).collect()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinWithAmbiguousFields(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c)
+
+    ds1.join(ds2)
+      // must fail. Both inputs share the same field 'c
+      .where('a === 'd)
+      .select('c, 'g)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testNoEqualityJoinPredicate1(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.join(ds2)
+      // must fail. No equality join predicate
+      .where('d === 'f)
+      .select('c, 'g).collect()
+  }
+
+  @Test(expected = classOf[TableException])
+  def testNoEqualityJoinPredicate2(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds1.join(ds2)
+      // must fail. No equality join predicate
+      .where('a < 'd)
+      .select('c, 'g).collect()
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.join(ds2).where('b === 'e).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNoJoinCondition(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testNoEquiJoin(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setNullCheck(true)
+
+    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
+
+    ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testJoinNonExistingKeyJava() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val in1 = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
+    val in2 = tableEnv.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'h)
+    // Must fail. Field foo does not exist.
+    in1.join(in2).where("foo === e").select("c, g")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testJoinWithNonMatchingKeyTypesJava() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val in1 = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
+    val in2 = tableEnv.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c)
+    in1.join(in2)
+      // Must fail. Types of join fields are not compatible (Integer and String)
+    .where("a === g").select("c, g")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  @throws[Exception]
+  def testJoinWithAmbiguousFieldsJava() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val in1 = tableEnv.fromDataSet(ds1, 'a, 'b, 'c)
+    val in2 = tableEnv.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c)
+    // Must fail. Join input have overlapping field names.
+    in1.join(in2).where("a === d").select("c, g")
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testJoinTablesFromDifferentEnvsJava() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env)
+    val in1 = tEnv1.fromDataSet(ds1, 'a, 'b, 'c)
+    val in2 = tEnv2.fromDataSet(ds2, 'd, 'e, 'f, 'g, 'c)
+    // Must fail. Tables are bound to different TableEnvironments.
+    in1.join(in2).where("a === d").select("g.count")
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala
new file mode 100644
index 0000000..baac9ea
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SetOperatorsValidationTest.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.junit._
+
+class SetOperatorsValidationTest {
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionDifferentColumnSize(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
+
+    // must fail. Union inputs have different column size.
+    ds1.unionAll(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionDifferentFieldTypes(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    // must fail. Union inputs have different field types.
+    ds1.unionAll(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testUnionTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.unionAll(ds2).select('c)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinusDifferentFieldTypes(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    // must fail. Minus inputs have different field types.
+    ds1.minus(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testMinusAllTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.minusAll(ds2).select('c)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersectWithDifferentFieldTypes(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
+      .select('a, 'b, 'c)
+
+    // must fail. Intersect inputs have different field types.
+    ds1.intersect(ds2)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testIntersectTablesFromDifferentEnvs(): Unit = {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tEnv1 = TableEnvironment.getTableEnvironment(env)
+    val tEnv2 = TableEnvironment.getTableEnvironment(env)
+
+    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
+    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
+
+    // Must fail. Tables are bound to different TableEnvironments.
+    ds1.intersect(ds2).select('c)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala
new file mode 100644
index 0000000..4188f51
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/SortValidationTest.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table.validation
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import org.apache.flink.api.scala.{ExecutionEnvironment, _}
+import org.apache.flink.types.Row
+import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.junit._
+
+class SortValidationTest {
+
+  def getExecutionEnvironment = {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(4)
+    env
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testFetchWithoutOrder(): Unit = {
+    val env = getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val ds = CollectionDataSets.get3TupleDataSet(env)
+    val t = ds.toTable(tEnv).limit(0, 5)
+
+    t.toDataSet[Row].collect()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala
new file mode 100644
index 0000000..435d6b9
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/utils/LogicalPlanFormatUtils.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.utils
+
+object LogicalPlanFormatUtils {
+  private val tempPattern = """TMP_\d+""".r
+
+  def formatTempTableId(preStr: String): String = {
+    val str = preStr.replaceAll("ArrayBuffer\\(", "List\\(")
+    val minId = getMinTempTableId(str)
+    tempPattern.replaceAllIn(str, s => "TMP_" + (s.matched.substring(4).toInt - minId) )
+  }
+
+  private def getMinTempTableId(logicalStr: String): Long = {
+    tempPattern.findAllIn(logicalStr).map(s => {
+      s.substring(4).toInt
+    }).min
+  }
+}


[2/2] flink git commit: [FLINK-5084] [table] Replace Java Table API integration tests by unit tests

Posted by tw...@apache.org.
[FLINK-5084] [table] Replace Java Table API integration tests by unit tests

This closes #2977.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/649cf054
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/649cf054
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/649cf054

Branch: refs/heads/master
Commit: 649cf054e692ea7ffba3125377300fa0496908e7
Parents: 614acc3
Author: mtunique <oa...@gmail.com>
Authored: Fri Dec 9 13:23:36 2016 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Jan 10 11:54:23 2017 +0100

----------------------------------------------------------------------
 .../flink/table/api/java/batch/ExplainTest.java | 160 --------
 .../java/batch/table/AggregationsITCase.java    | 380 ------------------
 .../table/api/java/batch/table/CalcITCase.java  | 324 ----------------
 .../api/java/batch/table/CastingITCase.java     | 140 -------
 .../table/api/java/batch/table/JoinITCase.java  | 207 ----------
 .../table/api/scala/batch/sql/CalcITCase.scala  |   2 +-
 .../scala/batch/table/AggregationsITCase.scala  |  74 +---
 .../api/scala/batch/table/CalcITCase.scala      | 101 ++---
 .../api/scala/batch/table/CastingITCase.scala   | 104 +++++
 .../api/scala/batch/table/JoinITCase.scala      | 113 +-----
 .../scala/batch/table/SetOperatorsITCase.scala  |  92 +----
 .../api/scala/batch/table/SortITCase.scala      |  11 -
 .../AggregationsStringExpressionTest.scala      | 342 ++++++++++++++++
 .../stringexpr/CalcStringExpressionTest.scala   | 386 +++++++++++++++++++
 .../CastingStringExpressionTest.scala           | 121 ++++++
 .../stringexpr/JoinStringExpressionTest.scala   | 276 +++++++++++++
 .../validation/AggregationsValidationTest.scala | 139 +++++++
 .../table/validation/CalcValidationTest.scala   | 138 +++++++
 .../table/validation/JoinValidationTest.scala   | 188 +++++++++
 .../validation/SetOperatorsValidationTest.scala | 119 ++++++
 .../table/validation/SortValidationTest.scala   |  47 +++
 .../batch/utils/LogicalPlanFormatUtils.scala    |  35 ++
 22 files changed, 1931 insertions(+), 1568 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
deleted file mode 100644
index 114579f..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java.batch;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Scanner;
-
-import static org.junit.Assert.assertEquals;
-
-public class ExplainTest extends MultipleProgramsTestBase {
-
-	public ExplainTest() {
-		super(TestExecutionMode.CLUSTER);
-	}
-
-	private static String testFilePath = ExplainTest.class.getResource("/").getFile();
-
-	@Test
-	public void testFilterWithoutExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input = env.fromElements(new Tuple2<>(1,"d"));
-		Table table = tableEnv
-			.fromDataSet(input, "a, b")
-			.filter("a % 2 = 0");
-
-		String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n");
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testFilter0.out"))){
-			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testFilterWithExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input = env.fromElements(new Tuple2<>(1,"d"));
-		Table table = tableEnv
-			.fromDataSet(input, "a, b")
-			.filter("a % 2 = 0");
-
-		String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n");
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testFilter1.out"))){
-			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testJoinWithoutExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
-		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
-		Table table1 = tableEnv.fromDataSet(input1, "a, b");
-		Table table2 = tableEnv.fromDataSet(input2, "c, d");
-		Table table = table1
-			.join(table2)
-			.where("b = d")
-			.select("a, c");
-
-		String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n");
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testJoin0.out"))){
-			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testJoinWithExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
-		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
-		Table table1 = tableEnv.fromDataSet(input1, "a, b");
-		Table table2 = tableEnv.fromDataSet(input2, "c, d");
-		Table table = table1
-			.join(table2)
-			.where("b = d")
-			.select("a, c");
-
-		String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n");
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testJoin1.out"))){
-			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testUnionWithoutExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
-		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
-		Table table1 = tableEnv.fromDataSet(input1, "count, word");
-		Table table2 = tableEnv.fromDataSet(input2, "count, word");
-		Table table = table1.unionAll(table2);
-
-		String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n");
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testUnion0.out"))){
-			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testUnionWithExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
-		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
-		Table table1 = tableEnv.fromDataSet(input1, "count, word");
-		Table table2 = tableEnv.fromDataSet(input2, "count, word");
-		Table table = table1.unionAll(table2);
-
-		String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n");
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testUnion1.out"))){
-			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-			assertEquals(source, result);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
deleted file mode 100644
index d37ebb5..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.java.batch.table;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.examples.java.WordCountTable.WC;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class AggregationsITCase extends TableProgramsTestBase {
-
-	public AggregationsITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testAggregationTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-
-		Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "231,1,21,21,11";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testAggregationOnNonExistingField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		Table table =
-				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-
-		Table result =
-				table.select("foo.avg");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testWorkingAggregationDataTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
-				env.fromElements(
-						new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
-						new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao"));
-
-		Table table = tableEnv.fromDataSet(input);
-
-		Table result =
-				table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,1,1,1.5,1.5,2";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAggregationWithArithmetic() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Float, String>> input =
-				env.fromElements(
-						new Tuple2<>(1f, "Hello"),
-						new Tuple2<>(2f, "Ciao"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result =
-				table.select("(f0 + 2).avg + 2, f1.count + 5");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "5.5,7";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAggregationWithTwoCount() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Float, String>> input =
-			env.fromElements(
-				new Tuple2<>(1f, "Hello"),
-				new Tuple2<>(2f, "Ciao"));
-
-		Table table =
-			tableEnv.fromDataSet(input);
-
-		Table result =
-			table.select("f0.count, f1.count");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testNonWorkingDataTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result =
-				// Must fail. Cannot compute SUM aggregate on String field.
-				table.select("f1.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testNoNestedAggregation() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result =
-				// Must fail. Aggregation on aggregation not allowed.
-				table.select("f0.sum.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testGroupingOnNonExistentField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv
-			.fromDataSet(input, "a, b, c")
-			// must fail. Field foo is not in input
-			.groupBy("foo")
-			.select("a.avg");
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testGroupingInvalidSelection() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv
-			.fromDataSet(input, "a, b, c")
-			.groupBy("a, b")
-			// must fail. Field c is not a grouping key or aggregation
-			.select("c");
-	}
-
-	@Test
-	public void testGroupedAggregate() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.groupBy("b").select("b, a.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testGroupingKeyForwardIfNotUsed() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.groupBy("b").select("a.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testGroupNoAggregation() throws Exception {
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-			.groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
-		List<Row> results = ds.collect();
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testPojoAggregation() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-		DataSet<WC> input = env.fromElements(
-				new WC("Hello", 1),
-				new WC("Ciao", 1),
-				new WC("Hello", 1),
-				new WC("Hola", 1),
-				new WC("Hola", 1));
-
-		Table table = tableEnv.fromDataSet(input);
-
-		Table filtered = table
-				.groupBy("word")
-				.select("word.count as frequency, word")
-				.filter("frequency = 2");
-
-		List<String> result = tableEnv.toDataSet(filtered, WC.class)
-				.map(new MapFunction<WC, String>() {
-					public String map(WC value) throws Exception {
-						return value.word;
-					}
-				}).collect();
-		String expected = "Hello\n" + "Hola";
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testPojoGrouping() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<String, Double, String>> data = env.fromElements(
-			new Tuple3<>("A", 23.0, "Z"),
-			new Tuple3<>("A", 24.0, "Y"),
-			new Tuple3<>("B", 1.0, "Z"));
-
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		Table table = tableEnv
-			.fromDataSet(data, "groupMe, value, name")
-			.select("groupMe, value, name")
-			.where("groupMe != 'B'");
-
-		DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);
-
-		DataSet<MyPojo> result = myPojos.groupBy("groupMe")
-			.sortGroup("value", Order.DESCENDING)
-			.first(1);
-
-		List<MyPojo> resultList = result.collect();
-		compareResultAsText(resultList, "A,24.0,Y");
-	}
-
-	@Test
-	public void testDistinct() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table distinct = table.select("b").distinct();
-
-		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testDistinctAfterAggregate() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = CollectionDataSets.get5TupleDataSet(env);
-
-		Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
-
-		Table distinct = table.groupBy("a, e").select("e").distinct();
-
-		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1\n" + "2\n" + "3\n";
-		compareResultAsText(results, expected);
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public static class MyPojo implements Serializable {
-		private static final long serialVersionUID = 8741918940120107213L;
-
-		public String groupMe;
-		public double value;
-		public String name;
-
-		public MyPojo() {
-			// for serialization
-		}
-
-		public MyPojo(String groupMe, double value, String name) {
-			this.groupMe = groupMe;
-			this.value = value;
-			this.name = name;
-		}
-
-		@Override
-		public String toString() {
-			return groupMe + "," + value + "," + name;
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
deleted file mode 100644
index b1ef563..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java.batch.table;
-
-import java.util.Arrays;
-import java.util.Collection;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class CalcITCase extends TableProgramsTestBase {
-
-	public CalcITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
-	public static Collection<Object[]> parameters() {
-		return Arrays.asList(new Object[][] {
-			{ TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT() },
-			{ TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL() }
-		});
-	}
-
-	@Test
-	public void testSimpleSelectAllWithAs() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds, "a,b,c");
-
-		Table result = in
-				.select("a, b, c");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-			"20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testSimpleSelectWithNaming() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds);
-
-		Table result = in
-				.select("f0 as a, f1 as b")
-				.select("a, b");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testSimpleSelectRenameAll() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds);
-
-		Table result = in
-			.select("f0 as a, f1 as b, f2 as c")
-			.select("a, b");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-			"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-			"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testSelectInvalidField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv.fromDataSet(ds, "a, b, c")
-			// Must fail. Field foo does not exist
-			.select("a + 1, foo + 2");
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testSelectAmbiguousFieldNames() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv.fromDataSet(ds, "a, b, c")
-			// Must fail. Field foo does not exist
-			.select("a + 1 as foo, b + 2 as foo");
-	}
-
-	@Test
-	public void testSelectStar() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds, "a,b,c");
-
-		Table result = in
-			.select("*");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-		                  "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-		                  "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-		                  "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-		                  "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-		                  "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-		                  "20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAllRejectingFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter("false");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAllPassingFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter("true");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-			"20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testFilterOnIntegerTupleField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter(" a % 2 = 0 ");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testNotEquals() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter("!( a % 2 <> 0 ) ");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testDisjunctivePreds() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-			.filter("a < 2 || a > 20");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,Hi\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testIntegerBiggerThan128() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-			.filter("a = 300 ");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "300,1,Hello\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testFilterInvalidField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		table
-			// Must fail. Field foo does not exist.
-			.filter("foo = 17");
-	}
-
-	public static class OldHashCode extends ScalarFunction {
-		public int eval(String s) {
-			return -1;
-		}
-	}
-
-	public static class HashCode extends ScalarFunction {
-		public int eval(String s) {
-			return s.hashCode();
-		}
-	}
-
-	@Test
-	public void testUserDefinedScalarFunction() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		tableEnv.registerFunction("hashCode", new OldHashCode());
-		tableEnv.registerFunction("hashCode", new HashCode());
-
-		DataSource<String> input = env.fromElements("a", "b", "c");
-
-		Table table = tableEnv.fromDataSet(input, "text");
-
-		Table result = table.select("text.hashCode()");
-
-		DataSet<Integer> ds = tableEnv.toDataSet(result, Integer.class);
-		List<Integer> results = ds.collect();
-		String expected = "97\n98\n99";
-		compareResultAsText(results, expected);
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
deleted file mode 100644
index b1bb6e8..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java.batch.table;
-
-import java.util.List;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class CastingITCase extends TableProgramsTestBase {
-
-	public CastingITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testNumericAutocastInArithmetic() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple8<Byte, Short, Integer, Long, Float, Double, Long, Double>> input =
-				env.fromElements(new Tuple8<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, 1L, 1001.1));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result = table.select("f0 + 1, f1 +" +
-				" 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1, f6 + 1.0d, f7 + f0");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testNumericAutocastInComparison() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple6<Byte, Short, Integer, Long, Float, Double>> input =
-				env.fromElements(
-						new Tuple6<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d),
-						new Tuple6<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a,b,c,d,e,f");
-
-		Table result = table
-				.filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,2,2,2.0,2.0";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testCasting() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple4<Integer, Double, Long, Boolean>> input =
-				env.fromElements(new Tuple4<>(1, 0.0, 1L, true));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result = table.select(
-				// * -> String
-				"f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)," +
-				// NUMERIC TYPE -> Boolean
-				"f0.cast(BOOL), f1.cast(BOOL), f2.cast(BOOL)," +
-				// NUMERIC TYPE -> NUMERIC TYPE
-				"f0.cast(DOUBLE), f1.cast(INT), f2.cast(SHORT)," +
-				// Boolean -> NUMERIC TYPE
-				"f3.cast(DOUBLE)," +
-				// identity casting
-				"f0.cast(INT), f1.cast(DOUBLE), f2.cast(LONG), f3.cast(BOOL)");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,0.0,1,true," +
-			"true,false,true," +
-			"1.0,0,1," +
-			"1.0," +
-			"1,0.0,1,true\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testCastFromString() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple3<String, String, String>> input =
-				env.fromElements(new Tuple3<>("1", "true", "2.0"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result = table.select(
-				"f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,1,1,2.0,2.0,true\n";
-		compareResultAsText(results, expected);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
deleted file mode 100644
index a916998..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java.batch.table;
-
-import java.util.List;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-
-@RunWith(Parameterized.class)
-public class JoinITCase extends TableProgramsTestBase {
-
-	public JoinITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testJoin() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2).where("b === e").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testJoinWithFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2).where("b === e && b < 2").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hi,Hallo\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testJoinWithJoinFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2).where("b === e && a < 6 && h < b").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hello world, how are you?,Hallo Welt wie\n" +
-				"I am fine.,Hallo Welt wie\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testJoinWithMultipleKeys() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2).where("a === d && b === h").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-				"Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testJoinNonExistingKey() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		// Must fail. Field foo does not exist.
-		in1.join(in2).where("foo === e").select("c, g");
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testJoinWithNonMatchingKeyTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2)
-			// Must fail. Types of join fields are not compatible (Integer and String)
-			.where("a === g").select("c, g");
-
-		tableEnv.toDataSet(result, Row.class).collect();
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testJoinWithAmbiguousFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c");
-
-		// Must fail. Join input have overlapping field names.
-		in1.join(in2).where("a === d").select("c, g");
-	}
-
-	@Test
-	public void testJoinWithAggregation() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1
-				.join(in2).where("a === d").select("g.count");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "6";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testJoinTablesFromDifferentEnvs() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env);
-		BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
-		Table in2 = tEnv2.fromDataSet(ds2, "d, e, f, g, h");
-
-		// Must fail. Tables are bound to different TableEnvironments.
-		in1.join(in2).where("a === d").select("g.count");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
index f3f554b..a6e5c56 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
@@ -298,7 +298,7 @@ class CalcITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     tEnv.registerFunction("hashCode",
-      new org.apache.flink.table.api.java.batch.table.CalcITCase.OldHashCode)
+      org.apache.flink.table.api.scala.batch.table.OldHashCode)
     tEnv.registerFunction("hashCode", MyHashCode)
 
     val ds = env.fromElements("a", "b", "c")

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
index a98c258..94c2a5c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.examples.scala.WordCountTable.{WC => MyWC}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
@@ -54,17 +54,6 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testAggregationOnNonExistingField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      // Must fail. Field 'foo does not exist.
-      .select('foo.avg)
-  }
-
   @Test
   def testWorkingAggregationDataTypes(): Unit = {
 
@@ -142,30 +131,6 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testNonWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements(("Hello", 1)).toTable(tEnv)
-      // Must fail. Field '_1 is not a numeric type.
-      .select('_1.sum)
-
-    t.collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNoNestedAggregations(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements(("Hello", 1)).toTable(tEnv)
-      // Must fail. Sum aggregation can not be chained.
-      .select('_2.sum.sum)
-  }
-
   @Test
   def testSQLStyleAggregations(): Unit = {
 
@@ -236,30 +201,6 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testGroupingOnNonExistentField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. '_foo not a valid field
-      .groupBy('_foo)
-      .select('a.avg)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupingInvalidSelection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('a, 'b)
-      // must fail. 'c is not a grouping key or aggregation
-      .select('c)
-  }
-
   @Test
   def testGroupedAggregate(): Unit = {
 
@@ -360,9 +301,9 @@ class AggregationsITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-        .select('b, 4 as 'four, 'a)
-        .groupBy('b, 'four)
-        .select('four, 'a.sum)
+      .select('b, 4 as 'four, 'a)
+      .groupBy('b, 'four)
+      .select('four, 'a.sum)
 
     val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n"
     val results = t.toDataSet[Row].collect()
@@ -376,11 +317,11 @@ class AggregationsITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-        .groupBy('e, 'b % 3)
-        .select('c.min, 'e, 'a.avg, 'd.count)
+      .groupBy('e, 'b % 3)
+      .select('c.min, 'e, 'a.avg, 'd.count)
 
     val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" +
-        "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
+      "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
     val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -402,4 +343,3 @@ class AggregationsITCase(
   }
 
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
index bc4f4bd..164e834 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
@@ -27,11 +27,11 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Assert._
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -110,36 +110,6 @@ class CalcITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testSelectInvalidFieldFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. Field 'foo does not exist
-      .select('a, 'foo)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectAmbiguousRenaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'foo
-      .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectAmbiguousRenaming2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'a
-      .select('a, 'b as 'a).toDataSet[Row].print()
-  }
-
   @Test
   def testSelectStar(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -158,41 +128,6 @@ class CalcITCase(
   }
 
   @Test
-  def testAliasStarException(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
-      fail("TableException expected")
-    } catch {
-      case _: TableException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1 as '*, '_2 as 'b, '_1 as 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-  }
-
-  @Test
   def testAllRejectingFilter(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -326,17 +261,6 @@ class CalcITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testFilterInvalidFieldName(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    // must fail. Field 'foo does not exist
-    ds.filter( 'foo === 2 )
-  }
-
   @Test
   def testSimpleCalc(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -425,6 +349,19 @@ class CalcITCase(
     val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testUserDefinedScalarFunction() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    tableEnv.registerFunction("hashCode", OldHashCode)
+    tableEnv.registerFunction("hashCode", HashCode)
+    val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text)
+    val result = table.select("text.hashCode()")
+    val results = result.toDataSet[Row].collect()
+    val expected = "97\n98\n99"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }
 
 object CalcITCase {
@@ -436,3 +373,11 @@ object CalcITCase {
       Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava
   }
 }
+
+object HashCode extends ScalarFunction {
+  def eval(s: String): Int = s.hashCode
+}
+
+object OldHashCode extends ScalarFunction {
+  def eval(s: String): Int = -1
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
new file mode 100644
index 0000000..18d3333
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.Types._
+import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.JavaConverters._
+
+class CastingITCase {
+
+  @Test
+  def testNumericAutocastInArithmetic() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements(
+      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tableEnv)
+      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f,
+        '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
+
+    val results = table.toDataSet[Row].collect()
+    val expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1"
+    compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  @throws[Exception]
+  def testNumericAutocastInComparison() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements(
+      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d),
+      (2.toByte, 2.toShort, 2, 2L, 2.0f, 2.0d))
+      .toTable(tableEnv, 'a, 'b, 'c, 'd, 'e, 'f)
+      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1)
+
+    val results = table.toDataSet[Row].collect()
+    val expected: String = "2,2,2,2,2.0,2.0"
+    compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  @throws[Exception]
+  def testCasting() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements((1, 0.0, 1L, true)).toTable(tableEnv)
+      .select(
+        // * -> String
+      '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING),
+        // NUMERIC TYPE -> Boolean
+      '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN),
+        // NUMERIC TYPE -> NUMERIC TYPE
+      '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT),
+        // Boolean -> NUMERIC TYPE
+      '_4.cast(DOUBLE),
+        // identity casting
+      '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN))
+
+    val results = table.toDataSet[Row].collect()
+    val expected = "1,0.0,1,true," + "true,false,true," +
+      "1.0,0,1," + "1.0," + "1,0.0,1,true\n"
+    compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  @throws[Exception]
+  def testCastFromString() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements(("1", "true", "2.0")).toTable(tableEnv)
+      .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG),
+        '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN))
+
+    val results = table.toDataSet[Row].collect()
+    val expected = "1,1,1,1,2.0,2.0,true\n"
+    compareResultAsText(results.asJava, expected)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
index ce16ada..277db4c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
@@ -103,80 +103,12 @@ class JoinITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testJoinNonExistingKey(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. Field 'foo does not exist
-      .where('foo === 'e)
-      .select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinWithNonMatchingKeyTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. Field 'a is Int, and 'g is String
-      .where('a === 'g)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinWithAmbiguousFields(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c)
-
-    ds1.join(ds2)
-      // must fail. Both inputs share the same field 'c
-      .where('a === 'd)
-      .select('c, 'g)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testNoEqualityJoinPredicate1(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. No equality join predicate
-      .where('d === 'f)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[TableException])
-  def testNoEqualityJoinPredicate2(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. No equality join predicate
-      .where('a < 'd)
-      .select('c, 'g).collect()
-  }
-
   @Test
   def testJoinWithAggregation(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    // use different table env in order to let tmp table ids are the same
+    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -262,19 +194,6 @@ class JoinITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testJoinTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.join(ds2).where('b === 'e).select('c, 'g)
-  }
-
   @Test
   def testLeftJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
@@ -297,30 +216,6 @@ class JoinITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testNoJoinCondition(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNoEquiJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g)
-  }
-
   @Test
   def testRightJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
@@ -330,7 +225,7 @@ class JoinITCase(
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
-    val joinT = ds1.rightOuterJoin(ds2, "a = d && b = h").select('c, 'g)
+    val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
 
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" +
       "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" +
@@ -349,7 +244,7 @@ class JoinITCase(
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
-    val joinT = ds1.rightOuterJoin(ds2, "a = d && b < h").select('c, 'g)
+    val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
 
     val expected = "Hello world,BCD\n"
     val results = joinT.toDataSet[Row].collect()

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
index e369250..4e02a98 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -105,44 +105,6 @@ class SetOperatorsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testUnionDifferentColumnSize(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
-    // must fail. Union inputs have different column size.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Union inputs have different field types.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.unionAll(ds2).select('c)
-  }
-
   @Test
   def testMinusAll(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
@@ -178,19 +140,6 @@ class SetOperatorsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testMinusDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Minus inputs have different field types.
-    ds1.minus(ds2)
-  }
-
   @Test
   def testMinusDifferentFieldNames(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
@@ -207,19 +156,6 @@ class SetOperatorsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testMinusAllTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.minusAll(ds2).select('c)
-  }
-
   @Test
   def testIntersect(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
@@ -275,32 +211,6 @@ class SetOperatorsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testIntersectWithDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Intersect inputs have different field types.
-    ds1.intersect(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testIntersectTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.intersect(ds2).select('c)
-  }
-
   @Test
   def testIntersectWithScalarExpression(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
index 3cbc2c8..2991aaa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
@@ -172,15 +172,4 @@ class SortITCase(
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testFetchWithoutOrder(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).limit(0, 5)
-
-    t.toDataSet[Row].collect()
-  }
-
 }