You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2016/12/16 15:46:42 UTC

[13/47] flink git commit: [FLINK-4704] [table] Refactor package structure of flink-table.

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
new file mode 100644
index 0000000..a916998
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/table/JoinITCase.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.batch.table;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.types.Row;
+import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.test.javaApiOperators.util.CollectionDataSets;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+
+@RunWith(Parameterized.class)
+public class JoinITCase extends TableProgramsTestBase {
+
+	public JoinITCase(TestExecutionMode mode, TableConfigMode configMode){
+		super(mode, configMode);
+	}
+
+	@Test
+	public void testJoin() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		Table result = in1.join(in2).where("b === e").select("c, g");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testJoinWithFilter() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		Table result = in1.join(in2).where("b === e && b < 2").select("c, g");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hi,Hallo\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testJoinWithJoinFilter() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		Table result = in1.join(in2).where("b === e && a < 6 && h < b").select("c, g");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hello world, how are you?,Hallo Welt wie\n" +
+				"I am fine.,Hallo Welt wie\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testJoinWithMultipleKeys() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.get3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		Table result = in1.join(in2).where("a === d && b === h").select("c, g");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt wie gehts?\n" +
+				"Hello world,ABC\n" + "I am fine.,HIJ\n" + "I am fine.,IJK\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testJoinNonExistingKey() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		// Must fail. Field foo does not exist.
+		in1.join(in2).where("foo === e").select("c, g");
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testJoinWithNonMatchingKeyTypes() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		Table result = in1.join(in2)
+			// Must fail. Types of join fields are not compatible (Integer and String)
+			.where("a === g").select("c, g");
+
+		tableEnv.toDataSet(result, Row.class).collect();
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testJoinWithAmbiguousFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, c");
+
+		// Must fail. Join input have overlapping field names.
+		in1.join(in2).where("a === d").select("c, g");
+	}
+
+	@Test
+	public void testJoinWithAggregation() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tableEnv.fromDataSet(ds1, "a, b, c");
+		Table in2 = tableEnv.fromDataSet(ds2, "d, e, f, g, h");
+
+		Table result = in1
+				.join(in2).where("a === d").select("g.count");
+
+		DataSet<Row> ds = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "6";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = ValidationException.class)
+	public void testJoinTablesFromDifferentEnvs() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tEnv1 = TableEnvironment.getTableEnvironment(env);
+		BatchTableEnvironment tEnv2 = TableEnvironment.getTableEnvironment(env);
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		Table in1 = tEnv1.fromDataSet(ds1, "a, b, c");
+		Table in2 = tEnv2.fromDataSet(ds2, "d, e, f, g, h");
+
+		// Must fail. Tables are bound to different TableEnvironments.
+		in1.join(in2).where("a === d").select("g.count");
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
new file mode 100644
index 0000000..7c01d2b
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.stream.sql;
+
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.types.Row;
+import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.java.stream.utils.StreamTestData;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class SqlITCase extends StreamingMultipleProgramsTestBase {
+
+	@Test
+	public void testSelect() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		DataStream<Tuple3<Integer, Long, String>> ds = StreamTestData.getSmall3TupleDataSet(env);
+		Table in = tableEnv.fromDataStream(ds, "a,b,c");
+		tableEnv.registerTable("MyTable", in);
+
+		String sqlQuery = "SELECT * FROM MyTable";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink());
+		env.execute();
+
+		List<String> expected = new ArrayList<>();
+		expected.add("1,1,Hi");
+		expected.add("2,2,Hello");
+		expected.add("3,2,Hello world");
+
+		StreamITCase.compareWithList(expected);
+	}
+
+	@Test
+	public void testFilter() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = StreamTestData.get5TupleDataStream(env);
+		tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
+
+		String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink());
+		env.execute();
+
+		List<String> expected = new ArrayList<>();
+		expected.add("1,1,1");
+		expected.add("2,2,2");
+		expected.add("2,3,1");
+		expected.add("3,4,2");
+
+		StreamITCase.compareWithList(expected);
+	}
+
+	@Test
+	public void testUnion() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		DataStream<Tuple3<Integer, Long, String>> ds1 = StreamTestData.getSmall3TupleDataSet(env);
+		Table t1 = tableEnv.fromDataStream(ds1, "a,b,c");
+		tableEnv.registerTable("T1", t1);
+
+		DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds2 = StreamTestData.get5TupleDataStream(env);
+		tableEnv.registerDataStream("T2", ds2, "a, b, d, c, e");
+
+		String sqlQuery = "SELECT * FROM T1 " +
+							"UNION ALL " +
+							"(SELECT a, b, c FROM T2 WHERE a	< 3)";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toDataStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink());
+		env.execute();
+
+		List<String> expected = new ArrayList<>();
+		expected.add("1,1,Hi");
+		expected.add("2,2,Hello");
+		expected.add("3,2,Hello world");
+		expected.add("1,1,Hallo");
+		expected.add("2,2,Hallo Welt");
+		expected.add("2,3,Hallo Welt wie");
+
+		StreamITCase.compareWithList(expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
new file mode 100644
index 0000000..139801f
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.java.stream.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class StreamTestData {
+
+	public static DataStream<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment env) {
+
+		List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
+		data.add(new Tuple3<>(1, 1L, "Hi"));
+		data.add(new Tuple3<>(2, 2L, "Hello"));
+		data.add(new Tuple3<>(3, 2L, "Hello world"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataStream<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataStream(StreamExecutionEnvironment env) {
+
+		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
+		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
+		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
+		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
+		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
+		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
+		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
+		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
+		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
+		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
+		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
+		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
+		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
+		return env.fromCollection(data);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
deleted file mode 100644
index 9d00dda..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ExplainTest.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.test.util.MultipleProgramsTestBase
-import org.junit.Assert.assertEquals
-import org.junit._
-
-class ExplainTest
-  extends MultipleProgramsTestBase(MultipleProgramsTestBase.TestExecutionMode.CLUSTER) {
-
-  val testFilePath = ExplainTest.this.getClass.getResource("/").getFile
-
-  @Test
-  def testFilterWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements((1, "hello"))
-      .toTable(tEnv, 'a, 'b)
-      .filter("a % 2 = 0")
-
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testFilterWithExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table = env.fromElements((1, "hello"))
-      .toTable(tEnv, 'a, 'b)
-      .filter("a % 2 = 0")
-
-    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testFilter1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testJoinWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
-    val table = table1.join(table2).where("b = d").select("a, c")
-
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-  @Test
-  def testJoinWithExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'a, 'b)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'c, 'd)
-    val table = table1.join(table2).where("b = d").select("a, c")
-
-    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testJoin1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(source, result)
-  }
-
-  @Test
-  def testUnionWithoutExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table = table1.unionAll(table2)
-
-    val result = tEnv.explain(table).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion0.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
-  }
-
-  @Test
-  def testUnionWithExtended() : Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env)
-
-    val table1 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table2 = env.fromElements((1, "hello")).toTable(tEnv, 'count, 'word)
-    val table = table1.unionAll(table2)
-
-    val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n")
-    val source = scala.io.Source.fromFile(testFilePath +
-      "../../src/test/scala/resources/testUnion1.out").mkString.replaceAll("\\r\\n", "\n")
-    assertEquals(result, source)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
deleted file mode 100644
index ddea3ba..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/ProjectableTableSourceITCase.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch
-
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.sources.{BatchTableSource, ProjectableTableSource}
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.apache.flink.types.Row
-import org.junit.{Before, Test}
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class ProjectableTableSourceITCase(mode: TestExecutionMode,
-  configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  private val tableName = "MyTable"
-  private var tableEnv: BatchTableEnvironment = null
-
-  @Before
-  def initTableEnv(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    tableEnv = TableEnvironment.getTableEnvironment(env, config)
-    tableEnv.registerTableSource(tableName, new TestProjectableTableSource)
-  }
-
-  @Test
-  def testTableAPI(): Unit = {
-    val results = tableEnv
-                  .scan(tableName)
-                  .where("amount < 4")
-                  .select("id, name")
-                  .collect()
-
-    val expected = Seq(
-      "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16",
-      "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-  @Test
-  def testSQL(): Unit = {
-    val results = tableEnv
-                  .sql(s"select id, name from $tableName where amount < 4 ")
-                  .collect()
-
-    val expected = Seq(
-      "0,Record_0", "1,Record_1", "2,Record_2", "3,Record_3", "16,Record_16",
-      "17,Record_17", "18,Record_18", "19,Record_19", "32,Record_32").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}
-
-class TestProjectableTableSource(
-  fieldTypes: Array[TypeInformation[_]],
-  fieldNames: Array[String])
-  extends BatchTableSource[Row] with ProjectableTableSource[Row] {
-
-  def this() = this(
-    fieldTypes = Array(
-      BasicTypeInfo.STRING_TYPE_INFO,
-      BasicTypeInfo.LONG_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.DOUBLE_TYPE_INFO),
-    fieldNames = Array[String]("name", "id", "amount", "price")
-  )
-
-  /** Returns the data of the table as a [[org.apache.flink.api.java.DataSet]]. */
-  override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
-    execEnv.fromCollection(generateDynamicCollection(33, fieldNames).asJava, getReturnType)
-  }
-
-  /** Returns the types of the table fields. */
-  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
-  /** Returns the names of the table fields. */
-  override def getFieldsNames: Array[String] = fieldNames
-
-  /** Returns the [[TypeInformation]] for the return type. */
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
-
-  /** Returns the number of fields of the table. */
-  override def getNumberOfFields: Int = fieldNames.length
-
-  override def projectFields(fields: Array[Int]): TestProjectableTableSource = {
-    val projectedFieldTypes = new Array[TypeInformation[_]](fields.length)
-    val projectedFieldNames = new Array[String](fields.length)
-
-    fields.zipWithIndex.foreach(f => {
-      projectedFieldTypes(f._2) = fieldTypes(f._1)
-      projectedFieldNames(f._2) = fieldNames(f._1)
-    })
-    new TestProjectableTableSource(projectedFieldTypes, projectedFieldNames)
-  }
-
-  private def generateDynamicCollection(num: Int, fieldNames: Array[String]): Seq[Row] = {
-    for {cnt <- 0 until num}
-      yield {
-        val row = new Row(fieldNames.length)
-        fieldNames.zipWithIndex.foreach(
-          f =>
-            f._1 match {
-              case "name" =>
-                row.setField(f._2, "Record_" + cnt)
-              case "id" =>
-                row.setField(f._2, cnt.toLong)
-              case "amount" =>
-                row.setField(f._2, cnt.toInt % 16)
-              case "price" =>
-                row.setField(f._2, cnt.toDouble / 3)
-              case _ =>
-                throw new IllegalArgumentException(s"unknown field name $f._1")
-            }
-        )
-        row
-      }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
deleted file mode 100644
index b7c8bc0..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableEnvironmentITCase.scala
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch
-
-import java.util
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.{TableEnvironment, TableException, ValidationException}
-import org.apache.flink.types.Row
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class TableEnvironmentITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testSimpleRegister(): Unit = {
-
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet(tableName, ds)
-    val t = tEnv.scan(tableName).select('_1, '_2, '_3)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testRegisterWithFields(): Unit = {
-
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet(tableName, ds, 'a, 'b, 'c)
-    val t = tEnv.scan(tableName).select('a, 'b)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" +
-      "7,4\n" + "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
-      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRegisterExistingDataSet(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val ds1 = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds1)
-    val ds2 = CollectionDataSets.get5TupleDataSet(env)
-    // Must fail. Name is already in use.
-    tEnv.registerDataSet("MyTable", ds2)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testScanUnregisteredTable(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    // Must fail. No table registered under that name.
-    tEnv.scan("someTable")
-  }
-
-  @Test
-  def testTableRegister(): Unit = {
-
-    val tableName = "MyTable"
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv, 'a, 'b, 'c)
-    tEnv.registerTable(tableName, t)
-
-    val regT = tEnv.scan(tableName).select('a, 'b).filter('a > 8)
-
-    val expected = "9,4\n" + "10,4\n" +
-      "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" +
-      "15,5\n" + "16,6\n" + "17,6\n" + "18,6\n" +
-      "19,6\n" + "20,6\n" + "21,6\n"
-
-    val results = regT.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRegisterExistingTable(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-    tEnv.registerTable("MyTable", t1)
-    val t2 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv)
-    // Must fail. Name is already in use.
-    tEnv.registerDataSet("MyTable", t2)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testRegisterTableFromOtherEnv(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv1 = TableEnvironment.getTableEnvironment(env, config)
-    val tEnv2 = TableEnvironment.getTableEnvironment(env, config)
-
-    val t1 = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv1)
-    // Must fail. Table is bound to different TableEnvironment.
-    tEnv2.registerTable("MyTable", t1)
-  }
-
-  @Test
-  def testToTable(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val t = CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a, 'b, 'c)
-      .select('a, 'b, 'c)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testToTableFromCaseClass(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data = List(
-      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
-      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
-      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
-    val t =  env.fromCollection(data)
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-      .select('a, 'b, 'c, 'd)
-
-    val expected: String =
-      "Peter,28,4000.0,Sales\n" +
-      "Anna,56,10000.0,Engineering\n" +
-      "Lucy,42,6000.0,HR\n"
-    val results = t.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testToTableFromAndToCaseClass(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val data = List(
-      SomeCaseClass("Peter", 28, 4000.00, "Sales"),
-      SomeCaseClass("Anna", 56, 10000.00, "Engineering"),
-      SomeCaseClass("Lucy", 42, 6000.00, "HR"))
-
-    val t =  env.fromCollection(data)
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-      .select('a, 'b, 'c, 'd)
-
-    val expected: String =
-      "SomeCaseClass(Peter,28,4000.0,Sales)\n" +
-      "SomeCaseClass(Anna,56,10000.0,Engineering)\n" +
-      "SomeCaseClass(Lucy,42,6000.0,HR)\n"
-    val results = t.toDataSet[SomeCaseClass].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithToFewFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Number of fields does not match.
-      .toTable(tEnv, 'a, 'b)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithToManyFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Number of fields does not match.
-      .toTable(tEnv, 'a, 'b, 'c, 'd)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithAmbiguousFields(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    CollectionDataSets.get3TupleDataSet(env)
-      // Must fail. Field names not unique.
-      .toTable(tEnv, 'a, 'b, 'b)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithNonFieldReference1(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    // Must fail. as() can only have field references
-    CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a + 1, 'b, 'c)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testToTableWithNonFieldReference2(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    // Must fail. as() can only have field references
-    CollectionDataSets.get3TupleDataSet(env)
-      .toTable(tEnv, 'a as 'foo, 'b, 'c)
-  }
-}
-
-object TableEnvironmentITCase {
-
-  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
-  def parameters(): util.Collection[Array[java.lang.Object]] = {
-    Seq[Array[AnyRef]](
-      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
-      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.EFFICIENT)).asJava
-  }
-}
-
-case class SomeCaseClass(name: String, age: Int, salary: Double, department: String) {
-  def this() { this("", 0, 0.0, "") }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
deleted file mode 100644
index d7e99d4..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSinkITCase.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch
-
-import java.io.File
-
-import org.apache.flink.api.scala.{ExecutionEnvironment, _}
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.api.table.sinks.CsvTableSink
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-
-@RunWith(classOf[Parameterized])
-class TableSinkITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testBatchTableSink(): Unit = {
-
-    val tmpFile = File.createTempFile("flink-table-sink-test", ".tmp")
-    tmpFile.deleteOnExit()
-    val path = tmpFile.toURI.toString
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-    env.setParallelism(4)
-
-    val input = CollectionDataSets.get3TupleDataSet(env)
-      .map(x => x).setParallelism(4) // increase DOP to 4
-
-    val results = input.toTable(tEnv, 'a, 'b, 'c)
-      .where('a < 5 || 'a > 17)
-      .select('c, 'b)
-      .writeToSink(new CsvTableSink(path, fieldDelim = "|"))
-
-    env.execute()
-
-    val expected = Seq(
-      "Hi|1", "Hello|2", "Hello world|2", "Hello world, how are you?|3",
-      "Comment#12|6", "Comment#13|6", "Comment#14|6", "Comment#15|6").mkString("\n")
-
-    TestBaseUtils.compareResultsByLinesInMemory(expected, path)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
deleted file mode 100644
index b5c8ada..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/TableSourceITCase.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch
-
-import java.io.{File, FileOutputStream, OutputStreamWriter}
-
-import org.apache.flink.api.common.io.GenericInputFormat
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.{DataSet => JavaSet, ExecutionEnvironment => JavaExecEnv}
-import org.apache.flink.api.scala.ExecutionEnvironment
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.table.sources.{BatchTableSource, CsvTableSource}
-import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.api.table.TableEnvironment
-import org.apache.flink.types.Row
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit.Test
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class TableSourceITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testBatchTableSourceTableAPI(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
-    val results = tEnv
-      .scan("MyTestTable")
-      .where('amount < 4)
-      .select('amount * 'id, 'name)
-      .collect()
-
-    val expected = Seq(
-      "0,Record_0", "0,Record_16", "0,Record_32", "1,Record_1", "17,Record_17",
-      "36,Record_18", "4,Record_2", "57,Record_19", "9,Record_3").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testBatchTableSourceSQL(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    tEnv.registerTableSource("MyTestTable", new TestBatchTableSource())
-    val results = tEnv.sql(
-      "SELECT amount * id, name FROM MyTestTable WHERE amount < 4").collect()
-
-    val expected = Seq(
-      "0,Record_0", "0,Record_16", "0,Record_32", "1,Record_1", "17,Record_17",
-      "36,Record_18", "4,Record_2", "57,Record_19", "9,Record_3").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testCsvTableSource(): Unit = {
-
-    val csvRecords = Seq(
-      "First#Id#Score#Last",
-      "Mike#1#12.3#Smith",
-      "Bob#2#45.6#Taylor",
-      "Sam#3#7.89#Miller",
-      "Peter#4#0.12#Smith",
-      "% Just a comment",
-      "Liz#5#34.5#Williams",
-      "Sally#6#6.78#Miller",
-      "Alice#7#90.1#Smith",
-      "Kelly#8#2.34#Williams"
-    )
-
-    val tempFile = File.createTempFile("csv-test", "tmp")
-    tempFile.deleteOnExit()
-    val tmpWriter = new OutputStreamWriter(new FileOutputStream(tempFile), "UTF-8")
-    tmpWriter.write(csvRecords.mkString("$"))
-    tmpWriter.close()
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val csvTable = new CsvTableSource(
-      tempFile.getAbsolutePath,
-      Array("first", "id", "score", "last"),
-      Array(
-        BasicTypeInfo.STRING_TYPE_INFO,
-        BasicTypeInfo.INT_TYPE_INFO,
-        BasicTypeInfo.DOUBLE_TYPE_INFO,
-        BasicTypeInfo.STRING_TYPE_INFO
-      ),
-      fieldDelim = "#",
-      rowDelim = "$",
-      ignoreFirstLine = true,
-      ignoreComments = "%"
-    )
-
-    tEnv.registerTableSource("csvTable", csvTable)
-    val results = tEnv.sql(
-      "SELECT last, sum(score), max(id) FROM csvTable GROUP BY last").collect()
-
-    val expected = Seq(
-      "Smith,102.52,7",
-      "Taylor,45.6,2",
-      "Miller,14.67,6",
-      "Williams,36.84,8").mkString("\n")
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-}
-
-class TestBatchTableSource extends BatchTableSource[Row] {
-
-  val fieldTypes: Array[TypeInformation[_]] = Array(
-    BasicTypeInfo.STRING_TYPE_INFO,
-    BasicTypeInfo.LONG_TYPE_INFO,
-    BasicTypeInfo.INT_TYPE_INFO
-  )
-
-  /** Returns the data of the table as a [[org.apache.flink.api.java.DataSet]]. */
-  override def getDataSet(execEnv: JavaExecEnv): JavaSet[Row] = {
-    execEnv.createInput(new GeneratingInputFormat(33), getReturnType).setParallelism(1)
-  }
-
-  /** Returns the types of the table fields. */
-  override def getFieldTypes: Array[TypeInformation[_]] = fieldTypes
-
-  /** Returns the names of the table fields. */
-  override def getFieldsNames: Array[String] = Array("name", "id", "amount")
-
-  /** Returns the [[TypeInformation]] for the return type. */
-  override def getReturnType: TypeInformation[Row] = new RowTypeInfo(fieldTypes: _*)
-
-  /** Returns the number of fields of the table. */
-  override def getNumberOfFields: Int = 3
-}
-
-class GeneratingInputFormat(val num: Int) extends GenericInputFormat[Row] {
-
-  var cnt = 0L
-
-  override def reachedEnd(): Boolean = cnt >= num
-
-  override def nextRecord(reuse: Row): Row = {
-    reuse.setField(0, s"Record_$cnt")
-    reuse.setField(1, cnt)
-    reuse.setField(2, (cnt % 16).toInt)
-    cnt += 1
-    reuse
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
deleted file mode 100644
index d5d46ba..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/AggregationsITCase.scala
+++ /dev/null
@@ -1,301 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.sql
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.table.{TableException, TableEnvironment}
-import org.apache.flink.types.Row
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class AggregationsITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testAggregationTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT sum(_1), min(_1), max(_1), count(_1), avg(_1) FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "231,1,21,21,11"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTableAggregation(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT sum(_1) FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "231"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testDataSetAggregation(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT sum(_1) FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "231"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery =
-      "SELECT avg(_1), avg(_2), avg(_3), avg(_4), avg(_5), avg(_6), count(_7), " +
-      "  sum(CAST(_6 AS DECIMAL))" +
-      "FROM MyTable"
-
-    val ds = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao"))
-    tEnv.registerDataSet("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,1,1,1.5,1.5,2,3.0"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTableWorkingAggregationDataTypes(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT avg(a), avg(b), avg(c), avg(d), avg(e), avg(f), count(g)" +
-      "FROM MyTable"
-
-    val ds = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,1,1,1.5,1.5,2"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTableProjection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT avg(a), sum(a), count(a), avg(b), sum(b) " +
-      "FROM MyTable"
-
-    val ds = env.fromElements((1: Byte, 1: Short), (2: Byte, 2: Short)).toTable(tEnv, 'a, 'b)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,3,2,1,3"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testTableAggregationWithArithmetic(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT avg(a + 2) + 2, count(b) + 5 " +
-      "FROM MyTable"
-
-    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv, 'a, 'b)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "5.5,7"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAggregationWithTwoCount(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT count(_1), count(_2) FROM MyTable"
-
-    val ds = env.fromElements((1f, "Hello"), (2f, "Ciao")).toTable(tEnv)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "2,2"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-
-  @Test
-  def testAggregationAfterProjection(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT avg(a), sum(b), count(c) FROM " +
-      "(SELECT _1 as a, _2 as b, _3 as c FROM MyTable)"
-
-    val ds = env.fromElements(
-      (1: Byte, 1: Short, 1, 1L, 1.0f, 1.0d, "Hello"),
-      (2: Byte, 2: Short, 2, 2L, 2.0f, 2.0d, "Ciao")).toTable(tEnv)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,3,2"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[TableException])
-  def testDistinctAggregate(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT sum(_1) as a, count(distinct _3) as b FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds)
-
-    // must fail. distinct aggregates are not supported
-    tEnv.sql(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGroupedDistinctAggregate(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT _2, avg(distinct _1) as a, count(_3) as b FROM MyTable GROUP BY _2"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds)
-
-    // must fail. distinct aggregates are not supported
-    tEnv.sql(sqlQuery).toDataSet[Row]
-  }
-
-  @Test(expected = classOf[TableException])
-  def testGroupingSetAggregate(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT _2, _3, avg(_1) as a FROM MyTable GROUP BY GROUPING SETS (_2, _3)"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds)
-
-    // must fail. grouping sets are not supported
-    tEnv.sql(sqlQuery).toDataSet[Row]
-  }
-
-  @Test
-  def testAggregateEmptyDataSets(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT avg(a), sum(a), count(b) " +
-      "FROM MyTable where a = 4 group by a"
-
-    val sqlQuery2 = "SELECT avg(a), sum(a), count(b) " +
-      "FROM MyTable where a = 4"
-
-    val sqlQuery3 = "SELECT avg(a), sum(a), count(b) " +
-      "FROM MyTable"
-
-    val ds = env.fromElements(
-      (1: Byte, 1: Short),
-      (2: Byte, 2: Short))
-      .toTable(tEnv, 'a, 'b)
-
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-    val result2 = tEnv.sql(sqlQuery2)
-    val result3 = tEnv.sql(sqlQuery3)
-
-    val results = result.toDataSet[Row].collect()
-    val expected = Seq.empty
-    val results2 =  result2.toDataSet[Row].collect()
-    val expected2 = "null,null,0"
-    val results3 = result3.toDataSet[Row].collect()
-    val expected3 = "1,3,2"
-
-    assert(results.equals(expected),
-      "Empty result is expected for grouped set, but actual: " + results)
-    TestBaseUtils.compareResultAsText(results2.asJava, expected2)
-    TestBaseUtils.compareResultAsText(results3.asJava, expected3)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ffe9ec8e/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
deleted file mode 100644
index 5037469..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/api/scala/batch/sql/CalcITCase.scala
+++ /dev/null
@@ -1,329 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.scala.batch.sql
-
-
-import java.sql.{Date, Time, Timestamp}
-import java.util
-
-import org.apache.flink.api.scala._
-import org.apache.flink.api.scala.batch.sql.FilterITCase.MyHashCode
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase
-import org.apache.flink.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.api.scala.table._
-import org.apache.flink.api.scala.util.CollectionDataSets
-import org.apache.flink.api.table.functions.ScalarFunction
-import org.apache.flink.api.table.{TableEnvironment, ValidationException}
-import org.apache.flink.types.Row
-import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
-import org.apache.flink.test.util.TestBaseUtils
-import org.junit._
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-
-import scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class CalcITCase(
-    mode: TestExecutionMode,
-    configMode: TableConfigMode)
-  extends TableProgramsTestBase(mode, configMode) {
-
-  @Test
-  def testSelectStarFromTable(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSelectStarFromDataSet(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env)
-    tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSimpleSelectAll(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT a, b, c FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
-      "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
-      "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
-      "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" +
-      "15,5,Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" +
-      "19,6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testSelectWithNaming(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT _1 as a, _2 as b FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
-      "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
-      "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n"
-
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test(expected = classOf[ValidationException])
-  def testInvalidFields(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT a, foo FROM MyTable"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    tEnv.sql(sqlQuery)
-  }
-
-  @Test
-  def testAllRejectingFilter(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE false"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAllPassingFilter(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE true"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" + "4,3,Hello world, " +
-      "how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" + "7,4," +
-      "Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" + "11,5," +
-      "Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" + "14,5,Comment#8\n" + "15,5," +
-      "Comment#9\n" + "16,6,Comment#10\n" + "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19," +
-      "6,Comment#13\n" + "20,6,Comment#14\n" + "21,6,Comment#15\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnString(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE c LIKE '%world%'"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "3,2,Hello world\n" + "4,3,Hello world, how are you?\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterOnInteger(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)=0"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "2,2,Hello\n" + "4,3,Hello world, how are you?\n" +
-      "6,3,Luke Skywalker\n" + "8,4," + "Comment#2\n" + "10,4,Comment#4\n" +
-      "12,5,Comment#6\n" + "14,5,Comment#8\n" + "16,6," +
-      "Comment#10\n" + "18,6,Comment#12\n" + "20,6,Comment#14\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testDisjunctivePredicate(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE a < 2 OR a > 20"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1,1,Hi\n" + "21,6,Comment#15\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testFilterWithAnd(): Unit = {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT * FROM MyTable WHERE MOD(a,2)<>0 AND MOD(b,2)=0"
-
-    val ds = CollectionDataSets.get3TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c)
-    tEnv.registerTable("MyTable", ds)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "3,2,Hello world\n" + "7,4,Comment#1\n" +
-      "9,4,Comment#3\n" + "17,6,Comment#11\n" +
-      "19,6,Comment#13\n" + "21,6,Comment#15\n"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testAdvancedDataTypes(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    val sqlQuery = "SELECT a, b, c, DATE '1984-07-12', TIME '14:34:24', " +
-      "TIMESTAMP '1984-07-12 14:34:24' FROM MyTable"
-
-    val ds = env.fromElements((
-      Date.valueOf("1984-07-12"),
-      Time.valueOf("14:34:24"),
-      Timestamp.valueOf("1984-07-12 14:34:24")))
-    tEnv.registerDataSet("MyTable", ds, 'a, 'b, 'c)
-
-    val result = tEnv.sql(sqlQuery)
-
-    val expected = "1984-07-12,14:34:24,1984-07-12 14:34:24.0," +
-      "1984-07-12,14:34:24,1984-07-12 14:34:24.0"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-
-  @Test
-  def testUserDefinedScalarFunction(): Unit = {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val tEnv = TableEnvironment.getTableEnvironment(env, config)
-
-    tEnv.registerFunction("hashCode",
-      new org.apache.flink.api.java.batch.table.CalcITCase.OldHashCode)
-    tEnv.registerFunction("hashCode", MyHashCode)
-
-    val ds = env.fromElements("a", "b", "c")
-    tEnv.registerDataSet("MyTable", ds, 'text)
-
-    val result = tEnv.sql("SELECT hashCode(text) FROM MyTable")
-
-    val expected = "97\n98\n99"
-    val results = result.toDataSet[Row].collect()
-    TestBaseUtils.compareResultAsText(results.asJava, expected)
-  }
-}
-
-object FilterITCase {
-  object MyHashCode extends ScalarFunction {
-    def eval(s: String): Int = s.hashCode()
-  }
-}
-
-object CalcITCase {
-
-  @Parameterized.Parameters(name = "Execution mode = {0}, Table config = {1}")
-  def parameters(): util.Collection[Array[java.lang.Object]] = {
-    Seq[Array[AnyRef]](
-      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.DEFAULT),
-      Array(TestExecutionMode.COLLECTION, TableProgramsTestBase.NO_NULL)).asJava
-  }
-}