You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:52 UTC

[43/44] flink git commit: [FLINK-6617] [table] Restructuring of tests

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
new file mode 100644
index 0000000..eb97afe
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.runtime.utils.CommonTestData;
+import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+/**
+ * Integration tests for {@link BatchTableSource}.
+ */
+@RunWith(Parameterized.class)
+public class JavaTableSourceITCase extends TableProgramsCollectionTestBase {
+
+	public JavaTableSourceITCase(TableConfigMode configMode) {
+		super(configMode);
+	}
+
+	@Test
+	public void testBatchTableSourceTableAPI() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+		BatchTableSource csvTable = CommonTestData.getCsvTableSource();
+
+		tableEnv.registerTableSource("persons", csvTable);
+
+		Table result = tableEnv.scan("persons")
+			.select("id, first, last, score");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		String expected = "1,Mike,Smith,12.3\n" +
+			"2,Bob,Taylor,45.6\n" +
+			"3,Sam,Miller,7.89\n" +
+			"4,Peter,Smith,0.12\n" +
+			"5,Liz,Williams,34.5\n" +
+			"6,Sally,Miller,6.78\n" +
+			"7,Alice,Smith,90.1\n" +
+			"8,Kelly,Williams,2.34\n";
+
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testBatchTableSourceSQL() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+		BatchTableSource csvTable = CommonTestData.getCsvTableSource();
+
+		tableEnv.registerTableSource("persons", csvTable);
+
+		Table result = tableEnv
+			.sql("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+
+		String expected = "Smith,1,24.6\n" +
+			"Miller,3,15.78\n" +
+			"Smith,4,0.24\n" +
+			"Miller,6,13.56\n" +
+			"Williams,8,4.68\n";
+
+		compareResultAsText(results, expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
new file mode 100644
index 0000000..3c8a1cc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * This test should be replaced by a DataSetAggregateITCase.
+ * We should only perform logical unit tests here.
+ * Until then, we perform a cluster test here.
+ */
+@RunWith(Parameterized.class)
+public class GroupingSetsITCase extends TableProgramsClusterTestBase {
+
+	private static final String TABLE_NAME = "MyTable";
+	private static final String TABLE_WITH_NULLS_NAME = "MyTableWithNulls";
+	private BatchTableEnvironment tableEnv;
+
+	public GroupingSetsITCase(TestExecutionMode mode, TableConfigMode tableConfigMode) {
+		super(mode, tableConfigMode);
+	}
+
+	@Before
+	public void setupTables() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
+
+		DataSet<Tuple3<Integer, Long, String>> dataSet = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet(TABLE_NAME, dataSet);
+
+		MapOperator<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> dataSetWithNulls =
+			dataSet.map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
+
+				@Override
+				public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) throws Exception {
+					if (value.f2.toLowerCase().contains("world")) {
+						value.f2 = null;
+					}
+					return value;
+				}
+			});
+		tableEnv.registerDataSet(TABLE_WITH_NULLS_NAME, dataSetWithNulls);
+	}
+
+	@Test
+	public void testGroupingSets() throws Exception {
+		String query =
+			"SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " +
+				" GROUPING(f1) as gf1, GROUPING(f2) as gf2, " +
+				" GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " +
+				" GROUPING_ID(f1, f2) as gid " +
+				" FROM " + TABLE_NAME +
+				" GROUP BY GROUPING SETS (f1, f2)";
+
+		String expected =
+			"1,null,1,1,0,1,0,1,1\n" +
+			"6,null,18,1,0,1,0,1,1\n" +
+			"2,null,2,1,0,1,0,1,1\n" +
+			"4,null,8,1,0,1,0,1,1\n" +
+			"5,null,13,1,0,1,0,1,1\n" +
+			"3,null,5,1,0,1,0,1,1\n" +
+			"null,Comment#11,17,2,1,0,1,0,2\n" +
+			"null,Comment#8,14,2,1,0,1,0,2\n" +
+			"null,Comment#2,8,2,1,0,1,0,2\n" +
+			"null,Comment#1,7,2,1,0,1,0,2\n" +
+			"null,Comment#14,20,2,1,0,1,0,2\n" +
+			"null,Comment#7,13,2,1,0,1,0,2\n" +
+			"null,Comment#6,12,2,1,0,1,0,2\n" +
+			"null,Comment#3,9,2,1,0,1,0,2\n" +
+			"null,Comment#12,18,2,1,0,1,0,2\n" +
+			"null,Comment#5,11,2,1,0,1,0,2\n" +
+			"null,Comment#15,21,2,1,0,1,0,2\n" +
+			"null,Comment#4,10,2,1,0,1,0,2\n" +
+			"null,Hi,1,2,1,0,1,0,2\n" +
+			"null,Comment#10,16,2,1,0,1,0,2\n" +
+			"null,Hello world,3,2,1,0,1,0,2\n" +
+			"null,I am fine.,5,2,1,0,1,0,2\n" +
+			"null,Hello world, how are you?,4,2,1,0,1,0,2\n" +
+			"null,Comment#9,15,2,1,0,1,0,2\n" +
+			"null,Comment#13,19,2,1,0,1,0,2\n" +
+			"null,Luke Skywalker,6,2,1,0,1,0,2\n" +
+			"null,Hello,2,2,1,0,1,0,2";
+
+		checkSql(query, expected);
+	}
+
+	@Test
+	public void testGroupingSetsWithNulls() throws Exception {
+		String query =
+			"SELECT f1, f2, avg(f0) as a, GROUP_ID() as g FROM " + TABLE_WITH_NULLS_NAME +
+				" GROUP BY GROUPING SETS (f1, f2)";
+
+		String expected =
+			"6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" +
+				"null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" +
+				"null,null,3,2\nnull,Hello,2,2\nnull,Comment#9,15,2\nnull,Comment#8,14,2\n" +
+				"null,Comment#7,13,2\nnull,Comment#6,12,2\nnull,Comment#5,11,2\n" +
+				"null,Comment#4,10,2\nnull,Comment#3,9,2\nnull,Comment#2,8,2\n" +
+				"null,Comment#15,21,2\nnull,Comment#14,20,2\nnull,Comment#13,19,2\n" +
+				"null,Comment#12,18,2\nnull,Comment#11,17,2\nnull,Comment#10,16,2\n" +
+				"null,Comment#1,7,2";
+
+		checkSql(query, expected);
+	}
+
+	@Test
+	public void testCubeAsGroupingSets() throws Exception {
+		String cubeQuery =
+			"SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " +
+				" GROUPING(f1) as gf1, GROUPING(f2) as gf2, " +
+				" GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " +
+				" GROUPING_ID(f1, f2) as gid " +
+				" FROM " + TABLE_NAME + " GROUP BY CUBE (f1, f2)";
+
+		String groupingSetsQuery =
+			"SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " +
+				" GROUPING(f1) as gf1, GROUPING(f2) as gf2, " +
+				" GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " +
+				" GROUPING_ID(f1, f2) as gid " +
+				" FROM " + TABLE_NAME +
+				" GROUP BY GROUPING SETS ((f1, f2), (f1), (f2), ())";
+
+		compareSql(cubeQuery, groupingSetsQuery);
+	}
+
+	@Test
+	public void testRollupAsGroupingSets() throws Exception {
+		String rollupQuery =
+			"SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " +
+				" GROUPING(f1) as gf1, GROUPING(f2) as gf2, " +
+				" GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " +
+				" GROUPING_ID(f1, f2) as gid " +
+				" FROM " + TABLE_NAME + " GROUP BY ROLLUP (f1, f2)";
+
+		String groupingSetsQuery =
+			"SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " +
+				" GROUPING(f1) as gf1, GROUPING(f2) as gf2, " +
+				" GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " +
+				" GROUPING_ID(f1, f2) as gid " +
+				" FROM " + TABLE_NAME +
+				" GROUP BY GROUPING SETS ((f1, f2), (f1), ())";
+
+		compareSql(rollupQuery, groupingSetsQuery);
+	}
+
+	/**
+	 * Execute SQL query and check results.
+	 *
+	 * @param query    SQL query.
+	 * @param expected Expected result.
+	 */
+	private void checkSql(String query, String expected) throws Exception {
+		Table resultTable = tableEnv.sql(query);
+		DataSet<Row> resultDataSet = tableEnv.toDataSet(resultTable, Row.class);
+		List<Row> results = resultDataSet.collect();
+		TestBaseUtils.compareResultAsText(results, expected);
+	}
+
+	private void compareSql(String query1, String query2) throws Exception {
+
+		// Function to map row to string
+		MapFunction<Row, String> mapFunction = new MapFunction<Row, String>() {
+
+			@Override
+			public String map(Row value) throws Exception {
+				return value == null ? "null" : value.toString();
+			}
+		};
+
+		// Execute first query and store results
+		Table resultTable1 = tableEnv.sql(query1);
+		DataSet<Row> resultDataSet1 = tableEnv.toDataSet(resultTable1, Row.class);
+		List<String> results1 = resultDataSet1.map(mapFunction).collect();
+
+		// Execute second query and store results
+		Table resultTable2 = tableEnv.sql(query2);
+		DataSet<Row> resultDataSet2 = tableEnv.toDataSet(resultTable2, Row.class);
+		List<String> results2 = resultDataSet2.map(mapFunction).collect();
+
+		// Compare results
+		TestBaseUtils.compareResultCollections(results1, results2, new Comparator<String>() {
+
+			@Override
+			public int compare(String o1, String o2) {
+				return o2 == null ? o1 == null ? 0 : 1 : o1.compareTo(o2);
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
new file mode 100644
index 0000000..c5a394a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Integration tests for batch SQL.
+ */
+@RunWith(Parameterized.class)
+public class JavaSqlITCase extends TableProgramsCollectionTestBase {
+
+	public JavaSqlITCase(TableConfigMode configMode) {
+		super(configMode);
+	}
+
+	@Test
+	public void testValues() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		String sqlQuery = "VALUES (1, 'Test', TRUE, DATE '1944-02-24', 12.4444444444444445)," +
+			"(2, 'Hello', TRUE, DATE '1944-02-24', 12.666666665)," +
+			"(3, 'World', FALSE, DATE '1944-12-24', 12.54444445)";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+
+		List<Row> results = resultSet.collect();
+		String expected = "3,World,false,1944-12-24,12.5444444500000000\n" +
+			"2,Hello,true,1944-02-24,12.6666666650000000\n" +
+			// Calcite converts to decimals and strings with equal length
+			"1,Test ,true,1944-02-24,12.4444444444444445\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testSelectFromTable() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table in = tableEnv.fromDataSet(ds, "a,b,c");
+		tableEnv.registerTable("T", in);
+
+		String sqlQuery = "SELECT a, c FROM T";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
+			"4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n" +
+			"7,Comment#1\n" + "8,Comment#2\n" + "9,Comment#3\n" + "10,Comment#4\n" +
+			"11,Comment#5\n" + "12,Comment#6\n" + "13,Comment#7\n" +
+			"14,Comment#8\n" + "15,Comment#9\n" + "16,Comment#10\n" +
+			"17,Comment#11\n" + "18,Comment#12\n" + "19,Comment#13\n" +
+			"20,Comment#14\n" + "21,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testFilterFromDataSet() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet("DataSetTable", ds, "x, y, z");
+
+		String sqlQuery = "SELECT x FROM DataSetTable WHERE z LIKE '%Hello%'";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "2\n" + "3\n" + "4";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAggregation() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet("AggTable", ds, "x, y, z");
+
+		String sqlQuery = "SELECT sum(x), min(x), max(x), count(y), avg(x) FROM AggTable";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "231,1,21,21,11";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testJoin() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+		tableEnv.registerDataSet("t1", ds1, "a, b, c");
+		tableEnv.registerDataSet("t2", ds2, "d, e, f, g, h");
+
+		String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testMap() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<Tuple2<Integer, Map<String, String>>> rows = new ArrayList<>();
+		rows.add(new Tuple2<>(1, Collections.singletonMap("foo", "bar")));
+		rows.add(new Tuple2<>(2, Collections.singletonMap("foo", "spam")));
+
+		TypeInformation<Tuple2<Integer, Map<String, String>>> ty = new TupleTypeInfo<>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+
+		DataSet<Tuple2<Integer, Map<String, String>>> ds1 = env.fromCollection(rows, ty);
+		tableEnv.registerDataSet("t1", ds1, "a, b");
+
+		String sqlQuery = "SELECT b['foo'] FROM t1";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "bar\n" + "spam\n";
+		compareResultAsText(results, expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
new file mode 100644
index 0000000..be7879a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
@@ -0,0 +1,686 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.table;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.calcite.CalciteConfig;
+import org.apache.flink.table.calcite.CalciteConfigBuilder;
+import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase;
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.types.Row;
+
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link BatchTableEnvironment}.
+ */
+@RunWith(Parameterized.class)
+public class JavaTableEnvironmentITCase extends TableProgramsCollectionTestBase {
+
+	public JavaTableEnvironmentITCase(TableConfigMode configMode) {
+		super(configMode);
+	}
+
+	@Parameterized.Parameters(name = "Table config = {0}")
+	public static Collection<Object[]> parameters() {
+		return Arrays.asList(new Object[][] {
+			{ TableProgramsTestBase.DEFAULT() }
+		});
+	}
+
+	@Test
+	public void testSimpleRegister() throws Exception {
+		final String tableName = "MyTable";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet(tableName, ds);
+		Table t = tableEnv.scan(tableName);
+
+		Table result = t.select("f0, f1");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+				"8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testRegisterWithFields() throws Exception {
+		final String tableName = "MyTable";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet(tableName, ds, "a, b, c");
+		Table t = tableEnv.scan(tableName);
+
+		Table result = t.select("a, b, c");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+				"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+				"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+				"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+				"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+				"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+				"20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testRegisterExistingDatasetTable() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		tableEnv.registerDataSet("MyTable", ds);
+		DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 =
+				CollectionDataSets.getSmall5TupleDataSet(env);
+		// Must fail. Name is already used for different table.
+		tableEnv.registerDataSet("MyTable", ds2);
+	}
+
+	@Test(expected = TableException.class)
+	public void testScanUnregisteredTable() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. No table registered under that name.
+		tableEnv.scan("nonRegisteredTable");
+	}
+
+	@Test
+	public void testTableRegister() throws Exception {
+		final String tableName = "MyTable";
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table t = tableEnv.fromDataSet(ds);
+		tableEnv.registerTable(tableName, t);
+		Table result = tableEnv.scan(tableName).select("f0, f1").filter("f0 > 7");
+
+		DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+		List<Row> results = resultSet.collect();
+		String expected = "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" +
+				"13,5\n" + "14,5\n" + "15,5\n" +
+				"16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testIllegalName() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table t = tableEnv.fromDataSet(ds);
+		// Must fail. Table name matches internal name pattern.
+		tableEnv.registerTable("_DataSetTable_42", t);
+	}
+
+	@Test(expected = TableException.class)
+	public void testRegisterTableFromOtherEnv() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv1 = TableEnvironment.getTableEnvironment(env, config());
+		BatchTableEnvironment tableEnv2 = TableEnvironment.getTableEnvironment(env, config());
+
+		Table t = tableEnv1.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
+		// Must fail. Table is bound to different TableEnvironment.
+		tableEnv2.registerTable("MyTable", t);
+	}
+
+	@Test
+	public void testAsFromTuple() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table table = tableEnv
+			.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
+			.select("a, b, c");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+			"4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+			"7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+			"11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+			"14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+			"17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+			"20,6,Comment#14\n" + "21,6,Comment#15\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromAndToTuple() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		Table table = tableEnv
+			.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
+			.select("a, b, c");
+
+		TypeInformation<?> ti = new TupleTypeInfo<Tuple3<Integer, Long, String>>(
+			BasicTypeInfo.INT_TYPE_INFO,
+			BasicTypeInfo.LONG_TYPE_INFO,
+			BasicTypeInfo.STRING_TYPE_INFO);
+
+		DataSet<?> ds = tableEnv.toDataSet(table, ti);
+		List<?> results = ds.collect();
+		String expected = "(1,1,Hi)\n" + "(2,2,Hello)\n" + "(3,2,Hello world)\n" +
+			"(4,3,Hello world, how are you?)\n" + "(5,3,I am fine.)\n" + "(6,3,Luke Skywalker)\n" +
+			"(7,4,Comment#1)\n" + "(8,4,Comment#2)\n" + "(9,4,Comment#3)\n" + "(10,4,Comment#4)\n" +
+			"(11,5,Comment#5)\n" + "(12,5,Comment#6)\n" + "(13,5,Comment#7)\n" +
+			"(14,5,Comment#8)\n" + "(15,5,Comment#9)\n" + "(16,6,Comment#10)\n" +
+			"(17,6,Comment#11)\n" + "(18,6,Comment#12)\n" + "(19,6,Comment#13)\n" +
+			"(20,6,Comment#14)\n" + "(21,6,Comment#15)\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Ignore
+	@Test
+	public void testAsFromTupleToPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<Tuple4<String, Integer, Double, String>> data = new ArrayList<>();
+		data.add(new Tuple4<>("Rofl", 1, 1.0, "Hi"));
+		data.add(new Tuple4<>("lol", 2, 1.0, "Hi"));
+		data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data), "q, w, e, r")
+			.select("q as a, w as b, e as c, r as d");
+
+		DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
+		List<SmallPojo2> results = ds.collect();
+		String expected = "Rofl,1,1.0,Hi\n" + "lol,2,1.0,Hi\n" + "Test me,4,3.33,Hello world\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<SmallPojo> data = new ArrayList<>();
+		data.add(new SmallPojo("Peter", 28, 4000.00, "Sales", new Integer[] {42}));
+		data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering", new Integer[] {}));
+		data.add(new SmallPojo("Lucy", 42, 6000.00, "HR", new Integer[] {1, 2, 3}));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"department AS a, " +
+				"age AS b, " +
+				"salary AS c, " +
+				"name AS d," +
+				"roles as e")
+			.select("a, b, c, d, e");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected =
+			"Sales,28,4000.0,Peter,[42]\n" +
+			"Engineering,56,10000.0,Anna,[]\n" +
+			"HR,42,6000.0,Lucy,[1, 2, 3]\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromPrivateFieldsPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<PrivateSmallPojo> data = new ArrayList<>();
+		data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
+		data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
+		data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"department AS a, " +
+				"age AS b, " +
+				"salary AS c, " +
+				"name AS d")
+			.select("a, b, c, d");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected =
+			"Sales,28,4000.0,Peter\n" +
+			"Engineering,56,10000.0,Anna\n" +
+			"HR,42,6000.0,Lucy\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromAndToPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<SmallPojo> data = new ArrayList<>();
+		data.add(new SmallPojo("Peter", 28, 4000.00, "Sales", new Integer[] {42}));
+		data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering", new Integer[] {}));
+		data.add(new SmallPojo("Lucy", 42, 6000.00, "HR", new Integer[] {1, 2, 3}));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"department AS a, " +
+				"age AS b, " +
+				"salary AS c, " +
+				"name AS d," +
+				"roles AS e")
+			.select("a, b, c, d, e");
+
+		DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
+		List<SmallPojo2> results = ds.collect();
+		String expected =
+			"Sales,28,4000.0,Peter,[42]\n" +
+			"Engineering,56,10000.0,Anna,[]\n" +
+			"HR,42,6000.0,Lucy,[1, 2, 3]\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsFromAndToPrivateFieldPojo() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<PrivateSmallPojo> data = new ArrayList<>();
+		data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
+		data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
+		data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"department AS a, " +
+				"age AS b, " +
+				"salary AS c, " +
+				"name AS d")
+			.select("a, b, c, d");
+
+		DataSet<PrivateSmallPojo2> ds = tableEnv.toDataSet(table, PrivateSmallPojo2.class);
+		List<PrivateSmallPojo2> results = ds.collect();
+		String expected =
+			"Sales,28,4000.0,Peter\n" +
+			"Engineering,56,10000.0,Anna\n" +
+			"HR,42,6000.0,Lucy\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test
+	public void testAsWithPojoAndGenericTypes() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		List<PojoWithGeneric> data = new ArrayList<>();
+		data.add(new PojoWithGeneric("Peter", 28, new HashMap<String, String>(), new ArrayList<String>()));
+		HashMap<String, String> hm1 = new HashMap<>();
+		hm1.put("test1", "test1");
+		data.add(new PojoWithGeneric("Anna", 56, hm1, new ArrayList<String>()));
+		HashMap<String, String> hm2 = new HashMap<>();
+		hm2.put("abc", "cde");
+		data.add(new PojoWithGeneric("Lucy", 42, hm2, new ArrayList<String>()));
+
+		Table table = tableEnv
+			.fromDataSet(env.fromCollection(data),
+				"name AS a, " +
+				"age AS b, " +
+				"generic AS c, " +
+				"generic2 AS d")
+			.select("a, b, c, c as c2, d")
+			.select("a, b, c, c === c2, d");
+
+		DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+		List<Row> results = ds.collect();
+		String expected =
+			"Peter,28,{},true,[]\n" +
+			"Anna,56,{test1=test1},true,[]\n" +
+			"Lucy,42,{abc=cde},true,[]\n";
+		compareResultAsText(results, expected);
+	}
+
+	@Test(expected = TableException.class)
+	public void testGenericRow() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// use null value the enforce GenericType
+		DataSet<Row> dataSet = env.fromElements(Row.of(1, 2L, "Hello", null));
+		assertTrue(dataSet.getType() instanceof GenericTypeInfo);
+		assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
+
+		// Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+		tableEnv.fromDataSet(dataSet);
+	}
+
+	@Test(expected = TableException.class)
+	public void testGenericRowWithAlias() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// use null value the enforce GenericType
+		DataSet<Row> dataSet = env.fromElements(Row.of((Integer) null));
+		assertTrue(dataSet.getType() instanceof GenericTypeInfo);
+		assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
+
+		// Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+		tableEnv.fromDataSet(dataSet, "nullField");
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithToManyFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. Too many field names specified.
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithAmbiguousFields() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. Specified field names are not unique.
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithNonFieldReference1() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. as() does only allow field name expressions
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
+	}
+
+	@Test(expected = TableException.class)
+	public void testAsWithNonFieldReference2() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail. as() does only allow field name expressions
+		tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b,  c");
+	}
+
+	@Test(expected = TableException.class)
+	public void testNonStaticClassInput() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail since class is not static
+		tableEnv.fromDataSet(env.fromElements(new MyNonStatic()), "name");
+	}
+
+	@Test(expected = TableException.class)
+	public void testNonStaticClassOutput() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		// Must fail since class is not static
+		Table t = tableEnv.fromDataSet(env.fromElements(1, 2, 3), "number");
+		tableEnv.toDataSet(t, MyNonStatic.class);
+	}
+
+	@Test(expected = TableException.class)
+	public void testCustomCalciteConfig() {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+		CalciteConfig cc = new CalciteConfigBuilder()
+				.replaceLogicalOptRuleSet(RuleSets.ofList())
+				.replacePhysicalOptRuleSet(RuleSets.ofList())
+				.build();
+		tableEnv.getConfig().setCalciteConfig(cc);
+
+		DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+		Table t = tableEnv.fromDataSet(ds);
+		tableEnv.toDataSet(t, Row.class);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Non-static class.
+	 */
+	public class MyNonStatic {
+		public int number;
+	}
+
+	/**
+	 * Small POJO.
+	 */
+	@SuppressWarnings("unused")
+	public static class SmallPojo {
+
+		public SmallPojo() { }
+
+		public SmallPojo(String name, int age, double salary, String department, Integer[] roles) {
+			this.name = name;
+			this.age = age;
+			this.salary = salary;
+			this.department = department;
+			this.roles = roles;
+		}
+
+		public String name;
+		public int age;
+		public double salary;
+		public String department;
+		public Integer[] roles;
+	}
+
+	/**
+	 * POJO with generic fields.
+	 */
+	@SuppressWarnings("unused")
+	public static class PojoWithGeneric {
+		public String name;
+		public int age;
+		public HashMap<String, String> generic;
+		public ArrayList<String> generic2;
+
+		public PojoWithGeneric() {
+			// default constructor
+		}
+
+		public PojoWithGeneric(String name, int age, HashMap<String, String> generic,
+				ArrayList<String> generic2) {
+			this.name = name;
+			this.age = age;
+			this.generic = generic;
+			this.generic2 = generic2;
+		}
+
+		@Override
+		public String toString() {
+			return name + "," + age + "," + generic + "," + generic2;
+		}
+	}
+
+	/**
+	 * Small POJO with private fields.
+	 */
+	@SuppressWarnings("unused")
+	public static class PrivateSmallPojo {
+
+		public PrivateSmallPojo() { }
+
+		public PrivateSmallPojo(String name, int age, double salary, String department) {
+			this.name = name;
+			this.age = age;
+			this.salary = salary;
+			this.department = department;
+		}
+
+		private String name;
+		private int age;
+		private double salary;
+		private String department;
+
+		public String getName() {
+			return name;
+		}
+
+		public void setName(String name) {
+			this.name = name;
+		}
+
+		public int getAge() {
+			return age;
+		}
+
+		public void setAge(int age) {
+			this.age = age;
+		}
+
+		public double getSalary() {
+			return salary;
+		}
+
+		public void setSalary(double salary) {
+			this.salary = salary;
+		}
+
+		public String getDepartment() {
+			return department;
+		}
+
+		public void setDepartment(String department) {
+			this.department = department;
+		}
+	}
+
+	/**
+	 * Another small POJO.
+	 */
+	@SuppressWarnings("unused")
+	public static class SmallPojo2 {
+
+		public SmallPojo2() { }
+
+		public SmallPojo2(String a, int b, double c, String d, Integer[] e) {
+			this.a = a;
+			this.b = b;
+			this.c = c;
+			this.d = d;
+			this.e = e;
+		}
+
+		public String a;
+		public int b;
+		public double c;
+		public String d;
+		public Integer[] e;
+
+		@Override
+		public String toString() {
+			return a + "," + b + "," + c + "," + d + "," + Arrays.toString(e);
+		}
+	}
+
+	/**
+	 * Another small POJO with private fields.
+	 */
+	@SuppressWarnings("unused")
+	public static class PrivateSmallPojo2 {
+
+		public PrivateSmallPojo2() { }
+
+		public PrivateSmallPojo2(String a, int b, double c, String d) {
+			this.a = a;
+			this.b = b;
+			this.c = c;
+			this.d = d;
+		}
+
+		private String a;
+		private int b;
+		private double c;
+		private String d;
+
+		public String getA() {
+			return a;
+		}
+
+		public void setA(String a) {
+			this.a = a;
+		}
+
+		public int getB() {
+			return b;
+		}
+
+		public void setB(int b) {
+			this.b = b;
+		}
+
+		public double getC() {
+			return c;
+		}
+
+		public void setC(double c) {
+			this.c = c;
+		}
+
+		public String getD() {
+			return d;
+		}
+
+		public void setD(String d) {
+			this.d = d;
+		}
+
+		@Override
+		public String toString() {
+			return a + "," + b + "," + c + "," + d;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
new file mode 100644
index 0000000..c6368d4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.runtime.utils.JavaStreamTestData;
+import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Integration tests for streaming SQL.
+ */
+public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
+
+	@Test
+	public void testRowRegisterRowWithNames() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		List<Row> data = new ArrayList<>();
+		data.add(Row.of(1, 1L, "Hi"));
+		data.add(Row.of(2, 2L, "Hello"));
+		data.add(Row.of(3, 2L, "Hello world"));
+
+		TypeInformation<?>[] types = {
+				BasicTypeInfo.INT_TYPE_INFO,
+				BasicTypeInfo.LONG_TYPE_INFO,
+				BasicTypeInfo.STRING_TYPE_INFO};
+		String[] names = {"a", "b", "c"};
+
+		RowTypeInfo typeInfo = new RowTypeInfo(types, names);
+
+		DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
+
+		Table in = tableEnv.fromDataStream(ds, "a,b,c");
+		tableEnv.registerTable("MyTableRow", in);
+
+		String sqlQuery = "SELECT a,c FROM MyTableRow";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink<Row>());
+		env.execute();
+
+		List<String> expected = new ArrayList<>();
+		expected.add("1,Hi");
+		expected.add("2,Hello");
+		expected.add("3,Hello world");
+
+		StreamITCase.compareWithList(expected);
+	}
+
+	@Test
+	public void testSelect() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		DataStream<Tuple3<Integer, Long, String>> ds = JavaStreamTestData.getSmall3TupleDataSet(env);
+		Table in = tableEnv.fromDataStream(ds, "a,b,c");
+		tableEnv.registerTable("MyTable", in);
+
+		String sqlQuery = "SELECT * FROM MyTable";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink<Row>());
+		env.execute();
+
+		List<String> expected = new ArrayList<>();
+		expected.add("1,1,Hi");
+		expected.add("2,2,Hello");
+		expected.add("3,2,Hello world");
+
+		StreamITCase.compareWithList(expected);
+	}
+
+	@Test
+	public void testFilter() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = JavaStreamTestData.get5TupleDataStream(env);
+		tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
+
+		String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink<Row>());
+		env.execute();
+
+		List<String> expected = new ArrayList<>();
+		expected.add("1,1,1");
+		expected.add("2,2,2");
+		expected.add("2,3,1");
+		expected.add("3,4,2");
+
+		StreamITCase.compareWithList(expected);
+	}
+
+	@Test
+	public void testUnion() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+		StreamITCase.clear();
+
+		DataStream<Tuple3<Integer, Long, String>> ds1 = JavaStreamTestData.getSmall3TupleDataSet(env);
+		Table t1 = tableEnv.fromDataStream(ds1, "a,b,c");
+		tableEnv.registerTable("T1", t1);
+
+		DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds2 = JavaStreamTestData.get5TupleDataStream(env);
+		tableEnv.registerDataStream("T2", ds2, "a, b, d, c, e");
+
+		String sqlQuery = "SELECT * FROM T1 " +
+							"UNION ALL " +
+							"(SELECT a, b, c FROM T2 WHERE a	< 3)";
+		Table result = tableEnv.sql(sqlQuery);
+
+		DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
+		resultSet.addSink(new StreamITCase.StringSink<Row>());
+		env.execute();
+
+		List<String> expected = new ArrayList<>();
+		expected.add("1,1,Hi");
+		expected.add("2,2,Hello");
+		expected.add("3,2,Hello world");
+		expected.add("1,1,Hallo");
+		expected.add("2,2,Hallo Welt");
+		expected.add("2,3,Hallo Welt wie");
+
+		StreamITCase.compareWithList(expected);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaPojos.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaPojos.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaPojos.java
new file mode 100644
index 0000000..6221834
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaPojos.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils;
+
+import java.io.Serializable;
+import java.sql.Timestamp;
+
+/**
+ * POJOs for Table API testing.
+ */
+public class JavaPojos {
+
+	/**
+	 * Pojo1 for test.
+	 */
+	public static class Pojo1 implements Serializable {
+
+		public Timestamp ts;
+		public String msg;
+
+		@Override
+		public String toString() {
+			return "Pojo1{" +
+					"ts=" + ts +
+					", msg='" + msg + '\'' +
+					'}';
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaStreamTestData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaStreamTestData.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaStreamTestData.java
new file mode 100644
index 0000000..b42c119
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaStreamTestData.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test data.
+ */
+public class JavaStreamTestData {
+
+	public static DataStream<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment env) {
+
+		List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
+		data.add(new Tuple3<>(1, 1L, "Hi"));
+		data.add(new Tuple3<>(2, 2L, "Hello"));
+		data.add(new Tuple3<>(3, 2L, "Hello world"));
+
+		Collections.shuffle(data);
+
+		return env.fromCollection(data);
+	}
+
+	public static DataStream<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataStream(StreamExecutionEnvironment env) {
+
+		List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
+		data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+		data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+		data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+		data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
+		data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
+		data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
+		data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
+		data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
+		data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
+		data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
+		data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
+		data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
+		data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
+		data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
+		data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
+		return env.fromCollection(data);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
new file mode 100644
index 0000000..4d06bc2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.functions.AggregateFunction;
+
+import java.util.Iterator;
+
+/**
+ * Test aggregator functions.
+ */
+public class JavaUserDefinedAggFunctions {
+	/**
+	 * Accumulator for test requiresOver.
+ 	 */
+	public static class Accumulator0 extends Tuple2<Long, Integer>{}
+
+	/**
+	 * Test for requiresOver.
+	 */
+	public static class OverAgg0 extends AggregateFunction<Long, Accumulator0> {
+		@Override
+		public Accumulator0 createAccumulator() {
+			return new Accumulator0();
+		}
+
+		@Override
+		public Long getValue(Accumulator0 accumulator) {
+			return 1L;
+		}
+
+		//Overloaded accumulate method
+		public void accumulate(Accumulator0 accumulator, long iValue, int iWeight) {
+		}
+
+		@Override
+		public boolean requiresOver() {
+			return true;
+		}
+	}
+
+	/**
+	 * Accumulator for WeightedAvg.
+	 */
+	public static class WeightedAvgAccum {
+		public long sum = 0;
+		public int count = 0;
+	}
+
+	/**
+	 * Base class for WeightedAvg.
+	 */
+	public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
+		@Override
+		public WeightedAvgAccum createAccumulator() {
+			return new WeightedAvgAccum();
+		}
+
+		@Override
+		public Long getValue(WeightedAvgAccum accumulator) {
+			if (accumulator.count == 0) {
+				return null;
+			} else {
+				return accumulator.sum / accumulator.count;
+			}
+		}
+
+		// overloaded accumulate method
+		public void accumulate(WeightedAvgAccum accumulator, long iValue, int iWeight) {
+			accumulator.sum += iValue * iWeight;
+			accumulator.count += iWeight;
+		}
+
+		//Overloaded accumulate method
+		public void accumulate(WeightedAvgAccum accumulator, int iValue, int iWeight) {
+			accumulator.sum += iValue * iWeight;
+			accumulator.count += iWeight;
+		}
+	}
+
+	/**
+	 * A WeightedAvg class with merge method.
+	 */
+	public static class WeightedAvgWithMerge extends WeightedAvg {
+		public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
+			Iterator<WeightedAvgAccum> iter = it.iterator();
+			while (iter.hasNext()) {
+				WeightedAvgAccum a = iter.next();
+				acc.count += a.count;
+				acc.sum += a.sum;
+			}
+		}
+	}
+
+	/**
+	 * A WeightedAvg class with merge and reset method.
+	 */
+	public static class WeightedAvgWithMergeAndReset extends WeightedAvgWithMerge {
+		public void resetAccumulator(WeightedAvgAccum acc) {
+			acc.count = 0;
+			acc.sum = 0L;
+		}
+	}
+
+	/**
+	 * A WeightedAvg class with retract method.
+	 */
+	public static class WeightedAvgWithRetract extends WeightedAvg {
+		//Overloaded retract method
+		public void retract(WeightedAvgAccum accumulator, long iValue, int iWeight) {
+			accumulator.sum -= iValue * iWeight;
+			accumulator.count -= iWeight;
+		}
+
+		//Overloaded retract method
+		public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) {
+			accumulator.sum -= iValue * iWeight;
+			accumulator.count -= iWeight;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedScalarFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedScalarFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedScalarFunctions.java
new file mode 100644
index 0000000..a77ad9a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedScalarFunctions.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+import java.util.Arrays;
+
+/**
+ * Test scalar functions.
+ */
+public class JavaUserDefinedScalarFunctions {
+
+	/**
+	 * Increment input.
+	 */
+	public static class JavaFunc0 extends ScalarFunction {
+		public long eval(Long l) {
+			return l + 1;
+		}
+	}
+
+	/**
+	 * Concatenate inputs as strings.
+	 */
+	public static class JavaFunc1 extends ScalarFunction {
+		public String eval(Integer a, int b,  Long c) {
+			return a + " and " + b + " and " + c;
+		}
+	}
+
+	/**
+	 * Append product to string.
+	 */
+	public static class JavaFunc2 extends ScalarFunction {
+		public String eval(String s, Integer... a) {
+			int m = 1;
+			for (int n : a) {
+				m *= n;
+			}
+			return s + m;
+		}
+	}
+
+	/**
+	 * Test overloading.
+	 */
+	public static class JavaFunc3 extends ScalarFunction {
+		public int eval(String a, int... b) {
+			return b.length;
+		}
+
+		public String eval(String c) {
+			return c;
+		}
+	}
+
+	/**
+	 * Concatenate arrays as strings.
+	 */
+	public static class JavaFunc4 extends ScalarFunction {
+		public String eval(Integer[] a, String[] b) {
+			return Arrays.toString(a) + " and " + Arrays.toString(b);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
new file mode 100644
index 0000000..cd92f49
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils;
+
+import org.apache.flink.table.functions.TableFunction;
+
+/**
+ * Test functions.
+ */
+public class JavaUserDefinedTableFunctions {
+
+	/**
+	 * Emit inputs as long.
+	 */
+	public static class JavaTableFunc0 extends TableFunction<Long> {
+		public void eval(Integer a, Long b, Long c) {
+			collect(a.longValue());
+			collect(b);
+			collect(c);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
deleted file mode 100644
index 9bc3e51..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,509 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.{Ignore, Test}
-
-class ExpressionReductionTest extends TableTestBase {
-
-  @Test
-  def testReduceCalcExpressionForBatchSQL(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest'), " +
-      "CAST(TRUE AS VARCHAR) || 'X'" +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "+(7, a) AS EXPR$0",
-        "+(b, 3) AS EXPR$1",
-        "'b' AS EXPR$2",
-        "'STRING' AS EXPR$3",
-        "'teststring' AS EXPR$4",
-        "null AS EXPR$5",
-        "1990-10-24 23:00:01.123 AS EXPR$6",
-        "19 AS EXPR$7",
-        "false AS EXPR$8",
-        "true AS EXPR$9",
-        "2 AS EXPR$10",
-        "true AS EXPR$11",
-        "'trueX' AS EXPR$12"
-      ),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceProjectExpressionForBatchSQL(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest'), " +
-      "CAST(TRUE AS VARCHAR) || 'X'" +
-      "FROM MyTable"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "+(7, a) AS EXPR$0",
-        "+(b, 3) AS EXPR$1",
-        "'b' AS EXPR$2",
-        "'STRING' AS EXPR$3",
-        "'teststring' AS EXPR$4",
-        "null AS EXPR$5",
-        "1990-10-24 23:00:01.123 AS EXPR$6",
-        "19 AS EXPR$7",
-        "false AS EXPR$8",
-        "true AS EXPR$9",
-        "2 AS EXPR$10",
-        "true AS EXPR$11",
-        "'trueX' AS EXPR$12"
-      )
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceFilterExpressionForBatchSQL(): Unit = {
-    val util = batchTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "*" +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceCalcExpressionForBatchTableAPI(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .where('a > (1 + 7))
-      .select((3 + 4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor(),
-              true.cast(Types.STRING) + "X")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "13 AS _c0",
-        "'b' AS _c1",
-        "'STRING' AS _c2",
-        "'teststring' AS _c3",
-        "1990-10-24 23:00:01.123 AS _c4",
-        "false AS _c5",
-        "true AS _c6",
-        "2E0 AS _c7",
-        "'trueX' AS _c8"
-      ),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceProjectExpressionForBatchTableAPI(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .select((3 + 4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor(),
-              true.cast(Types.STRING) + "X")
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select",
-        "13 AS _c0",
-        "'b' AS _c1",
-        "'STRING' AS _c2",
-        "'teststring' AS _c3",
-        "1990-10-24 23:00:01.123 AS _c4",
-        "false AS _c5",
-        "true AS _c6",
-        "2E0 AS _c7",
-        "'trueX' AS _c8"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceFilterExpressionForBatchTableAPI(): Unit = {
-    val util = batchTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .where('a > (1 + 7))
-
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceCalcExpressionForStreamSQL(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest'), " +
-      "CAST(TRUE AS VARCHAR) || 'X'" +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select",
-        "+(7, a) AS EXPR$0",
-        "+(b, 3) AS EXPR$1",
-        "'b' AS EXPR$2",
-        "'STRING' AS EXPR$3",
-        "'teststring' AS EXPR$4",
-        "null AS EXPR$5",
-        "1990-10-24 23:00:01.123 AS EXPR$6",
-        "19 AS EXPR$7",
-        "false AS EXPR$8",
-        "true AS EXPR$9",
-        "2 AS EXPR$10",
-        "true AS EXPR$11",
-        "'trueX' AS EXPR$12"
-      ),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceProjectExpressionForStreamSQL(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "(3+4)+a, " +
-      "b+(1+2), " +
-      "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
-      "TRIM(BOTH ' STRING '),  " +
-      "'test' || 'string', " +
-      "NULLIF(1, 1), " +
-      "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
-      "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)),  " +
-      "1 IS NULL, " +
-      "'TEST' LIKE '%EST', " +
-      "FLOOR(2.5), " +
-      "'TEST' IN ('west', 'TEST', 'rest'), " +
-      "CAST(TRUE AS VARCHAR) || 'X'" +
-      "FROM MyTable"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select",
-        "+(7, a) AS EXPR$0",
-        "+(b, 3) AS EXPR$1",
-        "'b' AS EXPR$2",
-        "'STRING' AS EXPR$3",
-        "'teststring' AS EXPR$4",
-        "null AS EXPR$5",
-        "1990-10-24 23:00:01.123 AS EXPR$6",
-        "19 AS EXPR$7",
-        "false AS EXPR$8",
-        "true AS EXPR$9",
-        "2 AS EXPR$10",
-        "true AS EXPR$11",
-        "'trueX' AS EXPR$12"
-      )
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceFilterExpressionForStreamSQL(): Unit = {
-    val util = streamTestUtil()
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val sqlQuery = "SELECT " +
-      "*" +
-      "FROM MyTable WHERE a>(1+7)"
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testReduceCalcExpressionForStreamTableAPI(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .where('a > (1 + 7))
-      .select((3 + 4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor(),
-              true.cast(Types.STRING) + "X")
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select",
-        "13 AS _c0",
-        "'b' AS _c1",
-        "'STRING' AS _c2",
-        "'teststring' AS _c3",
-        "1990-10-24 23:00:01.123 AS _c4",
-        "false AS _c5",
-        "true AS _c6",
-        "2E0 AS _c7",
-        "'trueX' AS _c8"
-      ),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceProjectExpressionForStreamTableAPI(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result =  table
-      .select((3 + 4).toExpr + 6,
-              (11 === 1) ? ("a", "b"),
-              " STRING ".trim,
-              "test" + "string",
-              "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
-              1.isNull,
-              "TEST".like("%EST"),
-              2.5.toExpr.floor(),
-              true.cast(Types.STRING) + "X")
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select",
-        "13 AS _c0",
-        "'b' AS _c1",
-        "'STRING' AS _c2",
-        "'teststring' AS _c3",
-        "1990-10-24 23:00:01.123 AS _c4",
-        "false AS _c5",
-        "true AS _c6",
-        "2E0 AS _c7",
-        "'trueX' AS _c8"
-      )
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceFilterExpressionForStreamTableAPI(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .where('a > (1 + 7))
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", ">(a, 8)")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testNestedTablesReductionStream(): Unit = {
-    val util = streamTestUtil()
-
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
-
-    util.tableEnv.registerTable("NewTable", newTable)
-
-    val sqlQuery = "SELECT a FROM NewTable"
-
-    // 1+1 should be normalized to 2
-    val expected = unaryNode("DataStreamCalc", streamTableNode(0), term("select", "+(2, a) AS a"))
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  @Test
-  def testNestedTablesReductionBatch(): Unit = {
-    val util = batchTestUtil()
-
-    util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
-
-    util.tableEnv.registerTable("NewTable", newTable)
-
-    val sqlQuery = "SELECT a FROM NewTable"
-
-    // 1+1 should be normalized to 2
-    val expected = unaryNode("DataSetCalc", batchTableNode(0), term("select", "+(2, a) AS a"))
-
-    util.verifySql(sqlQuery, expected)
-  }
-
-  // todo this NPE is caused by Calcite, it shall pass when [CALCITE-1860] is fixed
-  @Ignore
-  def testReduceDeterministicUDF(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    // if isDeterministic = true, will cause a Calcite NPE, which will be fixed in [CALCITE-1860]
-    val result = table
-      .select('a, 'b, 'c, DeterministicNullFunc() as 'd)
-      .where("d.isNull")
-      .select('a, 'b, 'c)
-
-    val expected: String = streamTableNode(0)
-
-    util.verifyTable(result, expected)
-  }
-
-  @Test
-  def testReduceNonDeterministicUDF(): Unit = {
-    val util = streamTestUtil()
-    val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
-    val result = table
-      .select('a, 'b, 'c, NonDeterministicNullFunc() as 'd)
-      .where("d.isNull")
-      .select('a, 'b, 'c)
-
-    val expected = unaryNode(
-      "DataStreamCalc",
-      streamTableNode(0),
-      term("select", "a", "b", "c"),
-      term("where", s"IS NULL(${NonDeterministicNullFunc.functionIdentifier}())")
-    )
-
-    util.verifyTable(result, expected)
-  }
-
-}
-
-object NonDeterministicNullFunc extends ScalarFunction {
-  def eval(): String = null
-  override def isDeterministic = false
-}
-
-object DeterministicNullFunc extends ScalarFunction {
-  def eval(): String = null
-  override def isDeterministic = true
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
new file mode 100644
index 0000000..a2356a6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.CommonTestData
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+/**
+  * Test for external catalog query plan.
+  */
+class ExternalCatalogTest extends TableTestBase {
+  private val table1Path: Array[String] = Array("test", "db1", "tb1")
+  private val table1TopLevelPath: Array[String] = Array("test", "tb1")
+  private val table1ProjectedFields: Array[String] = Array("a", "b", "c")
+  private val table2Path: Array[String] = Array("test", "db2", "tb2")
+  private val table2ProjectedFields: Array[String] = Array("d", "e", "g")
+
+  @Test
+  def testBatchTableApi(): Unit = {
+    val util = batchTestUtil()
+    val tableEnv = util.tableEnv
+
+    tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+    val table1 = tableEnv.scan("test", "db1", "tb1")
+    val table2 = tableEnv.scan("test", "db2", "tb2")
+    val result = table2
+        .select('d * 2, 'e, 'g.upperCase())
+        .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
+      ),
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table1Path, table1ProjectedFields),
+        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+      ),
+      term("union", "_c0", "e", "_c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testBatchSQL(): Unit = {
+    val util = batchTestUtil()
+
+    util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+    val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
+        "(SELECT a * 2, b, c FROM test.db1.tb1)"
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS EXPR$0", "e", "g"),
+        term("where", "<(d, 3)")),
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table1Path, table1ProjectedFields),
+        term("select", "*(a, 2) AS EXPR$0", "b", "c")
+      ),
+      term("union", "EXPR$0", "e", "g"))
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testStreamTableApi(): Unit = {
+    val util = streamTestUtil()
+    val tableEnv = util.tableEnv
+
+    util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+    val table1 = tableEnv.scan("test", "db1", "tb1")
+    val table2 = tableEnv.scan("test", "db2", "tb2")
+
+    val result = table2.where("d < 3")
+        .select('d * 2, 'e, 'g.upperCase())
+        .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+    val expected = binaryNode(
+      "DataStreamUnion",
+      unaryNode(
+        "DataStreamCalc",
+        sourceStreamTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2"),
+        term("where", "<(d, 3)")
+      ),
+      unaryNode(
+        "DataStreamCalc",
+        sourceStreamTableNode(table1Path, table1ProjectedFields),
+        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+      ),
+      term("union all", "_c0", "e", "_c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  @Test
+  def testStreamSQL(): Unit = {
+    val util = streamTestUtil()
+
+    util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+    val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
+        "(SELECT a * 2, b, c FROM test.db1.tb1)"
+
+    val expected = binaryNode(
+      "DataStreamUnion",
+      unaryNode(
+        "DataStreamCalc",
+        sourceStreamTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS EXPR$0", "e", "g"),
+        term("where", "<(d, 3)")),
+      unaryNode(
+        "DataStreamCalc",
+        sourceStreamTableNode(table1Path, table1ProjectedFields),
+        term("select", "*(a, 2) AS EXPR$0", "b", "c")
+      ),
+      term("union all", "EXPR$0", "e", "g"))
+
+    util.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testTopLevelTable(): Unit = {
+    val util = batchTestUtil()
+    val tableEnv = util.tableEnv
+
+    tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+    val table1 = tableEnv.scan("test", "tb1")
+    val table2 = tableEnv.scan("test", "db2", "tb2")
+    val result = table2
+      .select('d * 2, 'e, 'g.upperCase())
+      .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+    val expected = binaryNode(
+      "DataSetUnion",
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table2Path, table2ProjectedFields),
+        term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
+      ),
+      unaryNode(
+        "DataSetCalc",
+        sourceBatchTableNode(table1TopLevelPath, table1ProjectedFields),
+        term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+      ),
+      term("union", "_c0", "e", "_c2")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
+  def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
+    s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
+        s"fields=[${fields.mkString(", ")}])"
+  }
+
+  def sourceStreamTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
+    s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
+        s"fields=[${fields.mkString(", ")}])"
+  }
+}