You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/01/10 11:30:27 UTC

[2/2] flink git commit: [FLINK-5084] [table] Replace Java Table API integration tests by unit tests

[FLINK-5084] [table] Replace Java Table API integration tests by unit tests

This closes #2977.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/649cf054
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/649cf054
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/649cf054

Branch: refs/heads/master
Commit: 649cf054e692ea7ffba3125377300fa0496908e7
Parents: 614acc3
Author: mtunique <oa...@gmail.com>
Authored: Fri Dec 9 13:23:36 2016 +0800
Committer: twalthr <tw...@apache.org>
Committed: Tue Jan 10 11:54:23 2017 +0100

----------------------------------------------------------------------
 .../flink/table/api/java/batch/ExplainTest.java | 160 --------
 .../java/batch/table/AggregationsITCase.java    | 380 ------------------
 .../table/api/java/batch/table/CalcITCase.java  | 324 ----------------
 .../api/java/batch/table/CastingITCase.java     | 140 -------
 .../table/api/java/batch/table/JoinITCase.java  | 207 ----------
 .../table/api/scala/batch/sql/CalcITCase.scala  |   2 +-
 .../scala/batch/table/AggregationsITCase.scala  |  74 +---
 .../api/scala/batch/table/CalcITCase.scala      | 101 ++---
 .../api/scala/batch/table/CastingITCase.scala   | 104 +++++
 .../api/scala/batch/table/JoinITCase.scala      | 113 +-----
 .../scala/batch/table/SetOperatorsITCase.scala  |  92 +----
 .../api/scala/batch/table/SortITCase.scala      |  11 -
 .../AggregationsStringExpressionTest.scala      | 342 ++++++++++++++++
 .../stringexpr/CalcStringExpressionTest.scala   | 386 +++++++++++++++++++
 .../CastingStringExpressionTest.scala           | 121 ++++++
 .../stringexpr/JoinStringExpressionTest.scala   | 276 +++++++++++++
 .../validation/AggregationsValidationTest.scala | 139 +++++++
 .../table/validation/CalcValidationTest.scala   | 138 +++++++
 .../table/validation/JoinValidationTest.scala   | 188 +++++++++
 .../validation/SetOperatorsValidationTest.scala | 119 ++++++
 .../table/validation/SortValidationTest.scala   |  47 +++
 .../batch/utils/LogicalPlanFormatUtils.scala    |  35 ++
 22 files changed, 1931 insertions(+), 1568 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
deleted file mode 100644
index 114579f..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/ExplainTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java.batch;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-
-import java.io.File;
-import java.util.Scanner;
-
-import static org.junit.Assert.assertEquals;
-
-public class ExplainTest extends MultipleProgramsTestBase {
-
-	public ExplainTest() {
-		super(TestExecutionMode.CLUSTER);
-	}
-
-	private static String testFilePath = ExplainTest.class.getResource("/").getFile();
-
-	@Test
-	public void testFilterWithoutExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input = env.fromElements(new Tuple2<>(1,"d"));
-		Table table = tableEnv
-			.fromDataSet(input, "a, b")
-			.filter("a % 2 = 0");
-
-		String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n");
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testFilter0.out"))){
-			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testFilterWithExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input = env.fromElements(new Tuple2<>(1,"d"));
-		Table table = tableEnv
-			.fromDataSet(input, "a, b")
-			.filter("a % 2 = 0");
-
-		String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n");
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testFilter1.out"))){
-			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testJoinWithoutExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
-		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
-		Table table1 = tableEnv.fromDataSet(input1, "a, b");
-		Table table2 = tableEnv.fromDataSet(input2, "c, d");
-		Table table = table1
-			.join(table2)
-			.where("b = d")
-			.select("a, c");
-
-		String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n");
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testJoin0.out"))){
-			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testJoinWithExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
-		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
-		Table table1 = tableEnv.fromDataSet(input1, "a, b");
-		Table table2 = tableEnv.fromDataSet(input2, "c, d");
-		Table table = table1
-			.join(table2)
-			.where("b = d")
-			.select("a, c");
-
-		String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n");
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testJoin1.out"))){
-			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testUnionWithoutExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
-		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
-		Table table1 = tableEnv.fromDataSet(input1, "count, word");
-		Table table2 = tableEnv.fromDataSet(input2, "count, word");
-		Table table = table1.unionAll(table2);
-
-		String result = tableEnv.explain(table).replaceAll("\\r\\n", "\n");
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testUnion0.out"))){
-			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-			assertEquals(source, result);
-		}
-	}
-
-	@Test
-	public void testUnionWithExtended() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple2<Integer, String>> input1 = env.fromElements(new Tuple2<>(1,"d"));
-		DataSet<Tuple2<Integer, String>> input2 = env.fromElements(new Tuple2<>(1,"d"));
-		Table table1 = tableEnv.fromDataSet(input1, "count, word");
-		Table table2 = tableEnv.fromDataSet(input2, "count, word");
-		Table table = table1.unionAll(table2);
-
-		String result = tableEnv.explain(table, true).replaceAll("\\r\\n", "\n");
-		try (Scanner scanner = new Scanner(new File(testFilePath +
-			"../../src/test/scala/resources/testUnion1.out"))){
-			String source = scanner.useDelimiter("\\A").next().replaceAll("\\r\\n", "\n");
-			assertEquals(source, result);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
deleted file mode 100644
index d37ebb5..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/AggregationsITCase.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table.api.java.batch.table;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.operators.Order;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.tuple.Tuple7;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.examples.java.WordCountTable.WC;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class AggregationsITCase extends TableProgramsTestBase {
-
-	public AggregationsITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testAggregationTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		Table table = tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-
-		Table result = table.select("f0.sum, f0.min, f0.max, f0.count, f0.avg");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "231,1,21,21,11";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testAggregationOnNonExistingField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		Table table =
-				tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
-
-		Table result =
-				table.select("foo.avg");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testWorkingAggregationDataTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple7<Byte, Short, Integer, Long, Float, Double, String>> input =
-				env.fromElements(
-						new Tuple7<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, "Hello"),
-						new Tuple7<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d, "Ciao"));
-
-		Table table = tableEnv.fromDataSet(input);
-
-		Table result =
-				table.select("f0.avg, f1.avg, f2.avg, f3.avg, f4.avg, f5.avg, f6.count");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,1,1,1.5,1.5,2";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAggregationWithArithmetic() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Float, String>> input =
-				env.fromElements(
-						new Tuple2<>(1f, "Hello"),
-						new Tuple2<>(2f, "Ciao"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result =
-				table.select("(f0 + 2).avg + 2, f1.count + 5");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "5.5,7";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAggregationWithTwoCount() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Float, String>> input =
-			env.fromElements(
-				new Tuple2<>(1f, "Hello"),
-				new Tuple2<>(2f, "Ciao"));
-
-		Table table =
-			tableEnv.fromDataSet(input);
-
-		Table result =
-			table.select("f0.count, f1.count");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testNonWorkingDataTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result =
-				// Must fail. Cannot compute SUM aggregate on String field.
-				table.select("f1.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testNoNestedAggregation() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple2<Float, String>> input = env.fromElements(new Tuple2<>(1f, "Hello"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result =
-				// Must fail. Aggregation on aggregation not allowed.
-				table.select("f0.sum.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testGroupingOnNonExistentField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv
-			.fromDataSet(input, "a, b, c")
-			// must fail. Field foo is not in input
-			.groupBy("foo")
-			.select("a.avg");
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testGroupingInvalidSelection() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv
-			.fromDataSet(input, "a, b, c")
-			.groupBy("a, b")
-			// must fail. Field c is not a grouping key or aggregation
-			.select("c");
-	}
-
-	@Test
-	public void testGroupedAggregate() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.groupBy("b").select("b, a.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1\n" + "2,5\n" + "3,15\n" + "4,34\n" + "5,65\n" + "6,111\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testGroupingKeyForwardIfNotUsed() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.groupBy("b").select("a.sum");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1\n" + "5\n" + "15\n" + "34\n" + "65\n" + "111\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testGroupNoAggregation() throws Exception {
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-			.groupBy("b").select("a.sum as d, b").groupBy("b, d").select("b");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		String expected = "1\n" + "2\n" + "3\n" + "4\n" + "5\n" + "6\n";
-		List<Row> results = ds.collect();
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testPojoAggregation() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-		DataSet<WC> input = env.fromElements(
-				new WC("Hello", 1),
-				new WC("Ciao", 1),
-				new WC("Hello", 1),
-				new WC("Hola", 1),
-				new WC("Hola", 1));
-
-		Table table = tableEnv.fromDataSet(input);
-
-		Table filtered = table
-				.groupBy("word")
-				.select("word.count as frequency, word")
-				.filter("frequency = 2");
-
-		List<String> result = tableEnv.toDataSet(filtered, WC.class)
-				.map(new MapFunction<WC, String>() {
-					public String map(WC value) throws Exception {
-						return value.word;
-					}
-				}).collect();
-		String expected = "Hello\n" + "Hola";
-		compareResultAsText(result, expected);
-	}
-
-	@Test
-	public void testPojoGrouping() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Tuple3<String, Double, String>> data = env.fromElements(
-			new Tuple3<>("A", 23.0, "Z"),
-			new Tuple3<>("A", 24.0, "Y"),
-			new Tuple3<>("B", 1.0, "Z"));
-
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		Table table = tableEnv
-			.fromDataSet(data, "groupMe, value, name")
-			.select("groupMe, value, name")
-			.where("groupMe != 'B'");
-
-		DataSet<MyPojo> myPojos = tableEnv.toDataSet(table, MyPojo.class);
-
-		DataSet<MyPojo> result = myPojos.groupBy("groupMe")
-			.sortGroup("value", Order.DESCENDING)
-			.first(1);
-
-		List<MyPojo> resultList = result.collect();
-		compareResultAsText(resultList, "A,24.0,Y");
-	}
-
-	@Test
-	public void testDistinct() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table distinct = table.select("b").distinct();
-
-		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1\n" + "2\n" + "3\n"+ "4\n"+ "5\n"+ "6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testDistinctAfterAggregate() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> input = CollectionDataSets.get5TupleDataSet(env);
-
-		Table table = tableEnv.fromDataSet(input, "a, b, c, d, e");
-
-		Table distinct = table.groupBy("a, e").select("e").distinct();
-
-		DataSet<Row> ds = tableEnv.toDataSet(distinct, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1\n" + "2\n" + "3\n";
-		compareResultAsText(results, expected);
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public static class MyPojo implements Serializable {
-		private static final long serialVersionUID = 8741918940120107213L;
-
-		public String groupMe;
-		public double value;
-		public String name;
-
-		public MyPojo() {
-			// for serialization
-		}
-
-		public MyPojo(String groupMe, double value, String name) {
-			this.groupMe = groupMe;
-			this.value = value;
-			this.name = name;
-		}
-
-		@Override
-		public String toString() {
-			return groupMe + "," + value + "," + name;
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
deleted file mode 100644
index b1ef563..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CalcITCase.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java.batch.table;
-
-import java.util.Arrays;
-import java.util.Collection;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class CalcITCase extends TableProgramsTestBase {
-
-	public CalcITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
-	public static Collection<Object[]> parameters() {
-		return Arrays.asList(new Object[][] {
-			{ TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT() },
-			{ TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL() }
-		});
-	}
-
-	@Test
-	public void testSimpleSelectAllWithAs() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds, "a,b,c");
-
-		Table result = in
-				.select("a, b, c");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-			"20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testSimpleSelectWithNaming() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds);
-
-		Table result = in
-				.select("f0 as a, f1 as b")
-				.select("a, b");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testSimpleSelectRenameAll() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds);
-
-		Table result = in
-			.select("f0 as a, f1 as b, f2 as c")
-			.select("a, b");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-			"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-			"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testSelectInvalidField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv.fromDataSet(ds, "a, b, c")
-			// Must fail. Field foo does not exist
-			.select("a + 1, foo + 2");
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testSelectAmbiguousFieldNames() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-
-		tableEnv.fromDataSet(ds, "a, b, c")
-			// Must fail. Field foo does not exist
-			.select("a + 1 as foo, b + 2 as foo");
-	}
-
-	@Test
-	public void testSelectStar() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
-		Table in = tableEnv.fromDataSet(ds, "a,b,c");
-
-		Table result = in
-			.select("*");
-
-		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = resultSet.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-		                  "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-		                  "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-		                  "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-		                  "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-		                  "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-		                  "20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAllRejectingFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter("false");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testAllPassingFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter("true");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
-			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
-			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
-			"20,6,Comment#14\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testFilterOnIntegerTupleField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter(" a % 2 = 0 ");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testNotEquals() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-				.filter("!( a % 2 <> 0 ) ");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" + "6,3,Luke Skywalker\n" + "8,4," +
-				"Comment#2\n" + "10,4,Comment#4\n" + "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-				"Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testDisjunctivePreds() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-			.filter("a < 2 || a > 20");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,Hi\n" + "21,6,Comment#15\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testIntegerBiggerThan128() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = env.fromElements(new Tuple3<>(300, 1L, "Hello"));
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		Table result = table
-			.filter("a = 300 ");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "300,1,Hello\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testFilterInvalidField() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSet<Tuple3<Integer, Long, String>> input = CollectionDataSets.get3TupleDataSet(env);
-		Table table = tableEnv.fromDataSet(input, "a, b, c");
-
-		table
-			// Must fail. Field foo does not exist.
-			.filter("foo = 17");
-	}
-
-	public static class OldHashCode extends ScalarFunction {
-		public int eval(String s) {
-			return -1;
-		}
-	}
-
-	public static class HashCode extends ScalarFunction {
-		public int eval(String s) {
-			return s.hashCode();
-		}
-	}
-
-	@Test
-	public void testUserDefinedScalarFunction() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		tableEnv.registerFunction("hashCode", new OldHashCode());
-		tableEnv.registerFunction("hashCode", new HashCode());
-
-		DataSource<String> input = env.fromElements("a", "b", "c");
-
-		Table table = tableEnv.fromDataSet(input, "text");
-
-		Table result = table.select("text.hashCode()");
-
-		DataSet<Integer> ds = tableEnv.toDataSet(result, Integer.class);
-		List<Integer> results = ds.collect();
-		String expected = "97\n98\n99";
-		compareResultAsText(results, expected);
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
deleted file mode 100644
index b1bb6e8..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/CastingITCase.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java.batch.table;
-
-import java.util.List;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.tuple.Tuple6;
-import org.apache.flink.api.java.tuple.Tuple8;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class CastingITCase extends TableProgramsTestBase {
-
-	public CastingITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testNumericAutocastInArithmetic() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple8<Byte, Short, Integer, Long, Float, Double, Long, Double>> input =
-				env.fromElements(new Tuple8<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d, 1L, 1001.1));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result = table.select("f0 + 1, f1 +" +
-				" 1, f2 + 1L, f3 + 1.0f, f4 + 1.0d, f5 + 1, f6 + 1.0d, f7 + f0");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testNumericAutocastInComparison() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple6<Byte, Short, Integer, Long, Float, Double>> input =
-				env.fromElements(
-						new Tuple6<>((byte) 1, (short) 1, 1, 1L, 1.0f, 1.0d),
-						new Tuple6<>((byte) 2, (short) 2, 2, 2L, 2.0f, 2.0d));
-
-		Table table =
-				tableEnv.fromDataSet(input, "a,b,c,d,e,f");
-
-		Table result = table
-				.filter("a > 1 && b > 1 && c > 1L && d > 1.0f && e > 1.0d && f > 1");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "2,2,2,2,2.0,2.0";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testCasting() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple4<Integer, Double, Long, Boolean>> input =
-				env.fromElements(new Tuple4<>(1, 0.0, 1L, true));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result = table.select(
-				// * -> String
-				"f0.cast(STRING), f1.cast(STRING), f2.cast(STRING), f3.cast(STRING)," +
-				// NUMERIC TYPE -> Boolean
-				"f0.cast(BOOL), f1.cast(BOOL), f2.cast(BOOL)," +
-				// NUMERIC TYPE -> NUMERIC TYPE
-				"f0.cast(DOUBLE), f1.cast(INT), f2.cast(SHORT)," +
-				// Boolean -> NUMERIC TYPE
-				"f3.cast(DOUBLE)," +
-				// identity casting
-				"f0.cast(INT), f1.cast(DOUBLE), f2.cast(LONG), f3.cast(BOOL)");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,0.0,1,true," +
-			"true,false,true," +
-			"1.0,0,1," +
-			"1.0," +
-			"1,0.0,1,true\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testCastFromString() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
-
-		DataSource<Tuple3<String, String, String>> input =
-				env.fromElements(new Tuple3<>("1", "true", "2.0"));
-
-		Table table =
-				tableEnv.fromDataSet(input);
-
-		Table result = table.select(
-				"f0.cast(BYTE), f0.cast(SHORT), f0.cast(INT), f0.cast(LONG), f2.cast(DOUBLE), f2.cast(FLOAT), f1.cast(BOOL)");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "1,1,1,1,2.0,2.0,true\n";
-		compareResultAsText(results, expected);
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
deleted file mode 100644
index a916998..0000000
--- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api.java.batch.table;
-
-import java.util.List;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.table.api.java.BatchTableEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.types.Row;
-import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-
-@RunWith(Parameterized.class)
-public class JoinITCase extends TableProgramsTestBase {
-
-	public JoinITCase(TestExecutionMode mode, TableConfigMode configMode){
-		super(mode, configMode);
-	}
-
-	@Test
-	public void testJoin() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2).where("b === e").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testJoinWithFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2).where("b === e && b < 2").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hi,Hallo\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testJoinWithJoinFilter() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2).where("b === e && a < 6 && h < b").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hello world, how are you?,Hallo Welt wie\n" +
-				"I am fine.,Hallo Welt wie\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test
-	public void testJoinWithMultipleKeys() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2).where("a === d && b === h").select("c, g");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
-				"Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testJoinNonExistingKey() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		// Must fail. Field foo does not exist.
-		in1.join(in2).where("foo === e").select("c, g");
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testJoinWithNonMatchingKeyTypes() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1.join(in2)
-			// Must fail. Types of join fields are not compatible (Integer and String)
-			.where("a === g").select("c, g");
-
-		tableEnv.toDataSet(result, Row.class).collect();
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testJoinWithAmbiguousFields() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c");
-
-		// Must fail. Join input have overlapping field names.
-		in1.join(in2).where("a === d").select("c, g");
-	}
-
-	@Test
-	public void testJoinWithAggregation() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
-		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
-
-		Table result = in1
-				.join(in2).where("a === d").select("g.count");
-
-		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
-		List<Row> results = ds.collect();
-		String expected = "6";
-		compareResultAsText(results, expected);
-	}
-
-	@Test(expected = ValidationException.class)
-	public void testJoinTablesFromDifferentEnvs() {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env);
-		BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env);
-
-		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
-		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
-
-		Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
-		Table in2 = tEnv2.fromDataSet(ds2, "d, e, f, g, h");
-
-		// Must fail. Tables are bound to different TableEnvironments.
-		in1.join(in2).where("a === d").select("g.count");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
index f3f554b..a6e5c56 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/CalcITCase.scala
@@ -298,7 +298,7 @@ class CalcITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     tEnv.registerFunction("hashCode",
-      new org.apache.flink.table.api.java.batch.table.CalcITCase.OldHashCode)
+      org.apache.flink.table.api.scala.batch.table.OldHashCode)
     tEnv.registerFunction("hashCode", MyHashCode)
 
     val ds = env.fromElements("a", "b", "c")

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
index a98c258..94c2a5c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.examples.scala.WordCountTable.{WC => MyWC}
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
@@ -54,17 +54,6 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testAggregationOnNonExistingField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-      // Must fail. Field 'foo does not exist.
-      .select('foo.avg)
-  }
-
   @Test
   def testWorkingAggregationDataTypes(): Unit = {
 
@@ -142,30 +131,6 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(result.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testNonWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements(("Hello", 1)).toTable(tEnv)
-      // Must fail. Field '_1 is not a numeric type.
-      .select('_1.sum)
-
-    t.collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNoNestedAggregations(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = env.fromElements(("Hello", 1)).toTable(tEnv)
-      // Must fail. Sum aggregation can not be chained.
-      .select('_2.sum.sum)
-  }
-
   @Test
   def testSQLStyleAggregations(): Unit = {
 
@@ -236,30 +201,6 @@ class AggregationsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testGroupingOnNonExistentField(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. '_foo not a valid field
-      .groupBy('_foo)
-      .select('a.avg)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testGroupingInvalidSelection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      .groupBy('a, 'b)
-      // must fail. 'c is not a grouping key or aggregation
-      .select('c)
-  }
-
   @Test
   def testGroupedAggregate(): Unit = {
 
@@ -360,9 +301,9 @@ class AggregationsITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-        .select('b, 4 as 'four, 'a)
-        .groupBy('b, 'four)
-        .select('four, 'a.sum)
+      .select('b, 4 as 'four, 'a)
+      .groupBy('b, 'four)
+      .select('four, 'a.sum)
 
     val expected = "4,1\n" + "4,5\n" + "4,15\n" + "4,34\n" + "4,65\n" + "4,111\n"
     val results = t.toDataSet[Row].collect()
@@ -376,11 +317,11 @@ class AggregationsITCase(
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
 
     val t = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-        .groupBy('e, 'b % 3)
-        .select('c.min, 'e, 'a.avg, 'd.count)
+      .groupBy('e, 'b % 3)
+      .select('c.min, 'e, 'a.avg, 'd.count)
 
     val expected = "0,1,1,1\n" + "3,2,3,3\n" + "7,1,4,2\n" + "14,2,5,1\n" +
-        "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
+      "5,3,4,2\n" + "2,1,3,2\n" + "1,2,3,3\n" + "12,3,5,1"
     val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
@@ -402,4 +343,3 @@ class AggregationsITCase(
   }
 
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
index bc4f4bd..164e834 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CalcITCase.scala
@@ -27,11 +27,11 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, TableException, ValidationException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.expressions.Literal
+import org.apache.flink.table.functions.ScalarFunction
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Assert._
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
@@ -110,36 +110,6 @@ class CalcITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testSelectInvalidFieldFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. Field 'foo does not exist
-      .select('a, 'foo)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectAmbiguousRenaming(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'foo
-      .select('a + 1 as 'foo, 'b + 2 as 'foo).toDataSet[Row].print()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testSelectAmbiguousRenaming2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-      // must fail. 'a and 'b are both renamed to 'a
-      .select('a, 'b as 'a).toDataSet[Row].print()
-  }
-
   @Test
   def testSelectStar(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -158,41 +128,6 @@ class CalcITCase(
   }
 
   @Test
-  def testAliasStarException(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, '*, 'b, 'c)
-      fail("TableException expected")
-    } catch {
-      case _: TableException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-        .select('_1 as '*, '_2 as 'b, '_1 as 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('*, 'b, 'c)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-
-    try {
-      CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c).select('*, 'b)
-      fail("ValidationException expected")
-    } catch {
-      case _: ValidationException => //ignore
-    }
-  }
-
-  @Test
   def testAllRejectingFilter(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
@@ -326,17 +261,6 @@ class CalcITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testFilterInvalidFieldName(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-
-    // must fail. Field 'foo does not exist
-    ds.filter( 'foo === 2 )
-  }
-
   @Test
   def testSimpleCalc(): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -425,6 +349,19 @@ class CalcITCase(
     val results = t.toDataSet[Row].collect()
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testUserDefinedScalarFunction() {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env, config)
+    tableEnv.registerFunction("hashCode", OldHashCode)
+    tableEnv.registerFunction("hashCode", HashCode)
+    val table = env.fromElements("a", "b", "c").toTable(tableEnv, 'text)
+    val result = table.select("text.hashCode()")
+    val results = result.toDataSet[Row].collect()
+    val expected = "97\n98\n99"
+    TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
 }
 
 object CalcITCase {
@@ -436,3 +373,11 @@ object CalcITCase {
       Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava
   }
 }
+
+object HashCode extends ScalarFunction {
+  def eval(s: String): Int = s.hashCode
+}
+
+object OldHashCode extends ScalarFunction {
+  def eval(s: String): Int = -1
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
new file mode 100644
index 0000000..18d3333
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/CastingITCase.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.scala.batch.table
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.Types._
+import org.apache.flink.test.util.TestBaseUtils.compareResultAsText
+import org.apache.flink.types.Row
+import org.junit._
+
+import scala.collection.JavaConverters._
+
+class CastingITCase {
+
+  @Test
+  def testNumericAutocastInArithmetic() {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements(
+      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d, 1L, 1001.1)).toTable(tableEnv)
+      .select('_1 + 1, '_2 + 1, '_3 + 1L, '_4 + 1.0f,
+        '_5 + 1.0d, '_6 + 1, '_7 + 1.0d, '_8 + '_1)
+
+    val results = table.toDataSet[Row].collect()
+    val expected = "2,2,2,2.0,2.0,2.0,2.0,1002.1"
+    compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  @throws[Exception]
+  def testNumericAutocastInComparison() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+
+    val table = env.fromElements(
+      (1.toByte, 1.toShort, 1, 1L, 1.0f, 1.0d),
+      (2.toByte, 2.toShort, 2, 2L, 2.0f, 2.0d))
+      .toTable(tableEnv, 'a, 'b, 'c, 'd, 'e, 'f)
+      .filter('a > 1 && 'b > 1 && 'c > 1L && 'd > 1.0f && 'e > 1.0d && 'f > 1)
+
+    val results = table.toDataSet[Row].collect()
+    val expected: String = "2,2,2,2,2.0,2.0"
+    compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  @throws[Exception]
+  def testCasting() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements((1, 0.0, 1L, true)).toTable(tableEnv)
+      .select(
+        // * -> String
+      '_1.cast(STRING), '_2.cast(STRING), '_3.cast(STRING), '_4.cast(STRING),
+        // NUMERIC TYPE -> Boolean
+      '_1.cast(BOOLEAN), '_2.cast(BOOLEAN), '_3.cast(BOOLEAN),
+        // NUMERIC TYPE -> NUMERIC TYPE
+      '_1.cast(DOUBLE), '_2.cast(INT), '_3.cast(SHORT),
+        // Boolean -> NUMERIC TYPE
+      '_4.cast(DOUBLE),
+        // identity casting
+      '_1.cast(INT), '_2.cast(DOUBLE), '_3.cast(LONG), '_4.cast(BOOLEAN))
+
+    val results = table.toDataSet[Row].collect()
+    val expected = "1,0.0,1,true," + "true,false,true," +
+      "1.0,0,1," + "1.0," + "1,0.0,1,true\n"
+    compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  @throws[Exception]
+  def testCastFromString() {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val tableEnv = TableEnvironment.getTableEnvironment(env)
+    val table = env.fromElements(("1", "true", "2.0")).toTable(tableEnv)
+      .select('_1.cast(BYTE), '_1.cast(SHORT), '_1.cast(INT), '_1.cast(LONG),
+        '_3.cast(DOUBLE), '_3.cast(FLOAT), '_2.cast(BOOLEAN))
+
+    val results = table.toDataSet[Row].collect()
+    val expected = "1,1,1,1,2.0,2.0,true\n"
+    compareResultAsText(results.asJava, expected)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
index ce16ada..277db4c 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/JoinITCase.scala
@@ -103,80 +103,12 @@ class JoinITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testJoinNonExistingKey(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. Field 'foo does not exist
-      .where('foo === 'e)
-      .select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinWithNonMatchingKeyTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. Field 'a is Int, and 'g is String
-      .where('a === 'g)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testJoinWithAmbiguousFields(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'c)
-
-    ds1.join(ds2)
-      // must fail. Both inputs share the same field 'c
-      .where('a === 'd)
-      .select('c, 'g)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testNoEqualityJoinPredicate1(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. No equality join predicate
-      .where('d === 'f)
-      .select('c, 'g).collect()
-  }
-
-  @Test(expected = classOf[TableException])
-  def testNoEqualityJoinPredicate2(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    ds1.join(ds2)
-      // must fail. No equality join predicate
-      .where('a < 'd)
-      .select('c, 'g).collect()
-  }
-
   @Test
   def testJoinWithAggregation(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
     val tEnv = TableEnvironment.getTableEnvironment(env, config)
+    // use different table env in order to let tmp table ids are the same
+    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
 
     val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
@@ -262,19 +194,6 @@ class JoinITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testJoinTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv2, 'd, 'e, 'f, 'g, 'h)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.join(ds2).where('b === 'e).select('c, 'g)
-  }
-
   @Test
   def testLeftJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
@@ -297,30 +216,6 @@ class JoinITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testNoJoinCondition(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds2.leftOuterJoin(ds1, 'b === 'd && 'b < 3).select('c, 'g)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testNoEquiJoin(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    tEnv.getConfig.setNullCheck(true)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
-
-    val joinT = ds2.leftOuterJoin(ds1, 'b < 'd).select('c, 'g)
-  }
-
   @Test
   def testRightJoinWithMultipleKeys(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
@@ -330,7 +225,7 @@ class JoinITCase(
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
-    val joinT = ds1.rightOuterJoin(ds2, "a = d && b = h").select('c, 'g)
+    val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b === 'h).select('c, 'g)
 
     val expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "null,Hallo Welt wie\n" +
       "Hello world,Hallo Welt wie gehts?\n" + "Hello world,ABC\n" + "null,BCD\n" + "null,CDE\n" +
@@ -349,7 +244,7 @@ class JoinITCase(
     val ds1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
     val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'd, 'e, 'f, 'g, 'h)
 
-    val joinT = ds1.rightOuterJoin(ds2, "a = d && b < h").select('c, 'g)
+    val joinT = ds1.rightOuterJoin(ds2, 'a === 'd && 'b < 'h).select('c, 'g)
 
     val expected = "Hello world,BCD\n"
     val results = joinT.toDataSet[Row].collect()

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
index e369250..4e02a98 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SetOperatorsITCase.scala
@@ -24,7 +24,7 @@ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableC
 import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
 import org.apache.flink.types.Row
-import org.apache.flink.table.api.{TableEnvironment, ValidationException}
+import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
 import org.apache.flink.test.util.TestBaseUtils
 import org.junit._
@@ -105,44 +105,6 @@ class SetOperatorsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testUnionDifferentColumnSize(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'd, 'c, 'e)
-
-    // must fail. Union inputs have different column size.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Union inputs have different field types.
-    ds1.unionAll(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testUnionTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.unionAll(ds2).select('c)
-  }
-
   @Test
   def testMinusAll(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
@@ -178,19 +140,6 @@ class SetOperatorsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testMinusDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Minus inputs have different field types.
-    ds1.minus(ds2)
-  }
-
   @Test
   def testMinusDifferentFieldNames(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
@@ -207,19 +156,6 @@ class SetOperatorsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testMinusAllTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.minusAll(ds2).select('c)
-  }
-
   @Test
   def testIntersect(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
@@ -275,32 +211,6 @@ class SetOperatorsITCase(
     TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testIntersectWithDifferentFieldTypes(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c, 'd, 'e)
-      .select('a, 'b, 'c)
-
-    // must fail. Intersect inputs have different field types.
-    ds1.intersect(ds2)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testIntersectTablesFromDifferentEnvs(): Unit = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv1, 'a, 'b, 'c)
-    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv2, 'a, 'b, 'c)
-
-    // Must fail. Tables are bound to different TableEnvironments.
-    ds1.intersect(ds2).select('c)
-  }
-
   @Test
   def testIntersectWithScalarExpression(): Unit = {
     val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

http://git-wip-us.apache.org/repos/asf/flink/blob/649cf054/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
index 3cbc2c8..2991aaa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/SortITCase.scala
@@ -172,15 +172,4 @@ class SortITCase(
     TestBaseUtils.compareOrderedResultAsText(result.asJava, expected)
   }
 
-  @Test(expected = classOf[ValidationException])
-  def testFetchWithoutOrder(): Unit = {
-    val env = getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    val t = ds.toTable(tEnv).limit(0, 5)
-
-    t.toDataSet[Row].collect()
-  }
-
 }