You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/07/13 10:18:52 UTC
[43/44] flink git commit: [FLINK-6617] [table] Restructuring of tests
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
new file mode 100644
index 0000000..eb97afe
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/JavaTableSourceITCase.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.runtime.utils.CommonTestData;
+import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase;
+import org.apache.flink.table.sources.BatchTableSource;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+/**
+ * Integration tests for {@link BatchTableSource}.
+ */
+@RunWith(Parameterized.class)
+public class JavaTableSourceITCase extends TableProgramsCollectionTestBase {
+
+ public JavaTableSourceITCase(TableConfigMode configMode) {
+ super(configMode);
+ }
+
+ @Test
+ public void testBatchTableSourceTableAPI() throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+ BatchTableSource csvTable = CommonTestData.getCsvTableSource();
+
+ tableEnv.registerTableSource("persons", csvTable);
+
+ Table result = tableEnv.scan("persons")
+ .select("id, first, last, score");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+
+ String expected = "1,Mike,Smith,12.3\n" +
+ "2,Bob,Taylor,45.6\n" +
+ "3,Sam,Miller,7.89\n" +
+ "4,Peter,Smith,0.12\n" +
+ "5,Liz,Williams,34.5\n" +
+ "6,Sally,Miller,6.78\n" +
+ "7,Alice,Smith,90.1\n" +
+ "8,Kelly,Williams,2.34\n";
+
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testBatchTableSourceSQL() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+ BatchTableSource csvTable = CommonTestData.getCsvTableSource();
+
+ tableEnv.registerTableSource("persons", csvTable);
+
+ Table result = tableEnv
+ .sql("SELECT `last`, FLOOR(id), score * 2 FROM persons WHERE score < 20");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+
+ String expected = "Smith,1,24.6\n" +
+ "Miller,3,15.78\n" +
+ "Smith,4,0.24\n" +
+ "Miller,6,13.56\n" +
+ "Williams,8,4.68\n";
+
+ compareResultAsText(results, expected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
new file mode 100644
index 0000000..3c8a1cc
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/GroupingSetsITCase.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.runtime.utils.TableProgramsClusterTestBase;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.Row;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Comparator;
+import java.util.List;
+
+/**
+ * This test should be replaced by a DataSetAggregateITCase.
+ * We should only perform logical unit tests here.
+ * Until then, we perform a cluster test here.
+ */
+@RunWith(Parameterized.class)
+public class GroupingSetsITCase extends TableProgramsClusterTestBase {
+
+ private static final String TABLE_NAME = "MyTable";
+ private static final String TABLE_WITH_NULLS_NAME = "MyTableWithNulls";
+ private BatchTableEnvironment tableEnv;
+
+ public GroupingSetsITCase(TestExecutionMode mode, TableConfigMode tableConfigMode) {
+ super(mode, tableConfigMode);
+ }
+
+ @Before
+ public void setupTables() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ tableEnv = TableEnvironment.getTableEnvironment(env, new TableConfig());
+
+ DataSet<Tuple3<Integer, Long, String>> dataSet = CollectionDataSets.get3TupleDataSet(env);
+ tableEnv.registerDataSet(TABLE_NAME, dataSet);
+
+ MapOperator<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> dataSetWithNulls =
+ dataSet.map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
+
+ @Override
+ public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) throws Exception {
+ if (value.f2.toLowerCase().contains("world")) {
+ value.f2 = null;
+ }
+ return value;
+ }
+ });
+ tableEnv.registerDataSet(TABLE_WITH_NULLS_NAME, dataSetWithNulls);
+ }
+
+ @Test
+ public void testGroupingSets() throws Exception {
+ String query =
+ "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " +
+ " GROUPING(f1) as gf1, GROUPING(f2) as gf2, " +
+ " GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " +
+ " GROUPING_ID(f1, f2) as gid " +
+ " FROM " + TABLE_NAME +
+ " GROUP BY GROUPING SETS (f1, f2)";
+
+ String expected =
+ "1,null,1,1,0,1,0,1,1\n" +
+ "6,null,18,1,0,1,0,1,1\n" +
+ "2,null,2,1,0,1,0,1,1\n" +
+ "4,null,8,1,0,1,0,1,1\n" +
+ "5,null,13,1,0,1,0,1,1\n" +
+ "3,null,5,1,0,1,0,1,1\n" +
+ "null,Comment#11,17,2,1,0,1,0,2\n" +
+ "null,Comment#8,14,2,1,0,1,0,2\n" +
+ "null,Comment#2,8,2,1,0,1,0,2\n" +
+ "null,Comment#1,7,2,1,0,1,0,2\n" +
+ "null,Comment#14,20,2,1,0,1,0,2\n" +
+ "null,Comment#7,13,2,1,0,1,0,2\n" +
+ "null,Comment#6,12,2,1,0,1,0,2\n" +
+ "null,Comment#3,9,2,1,0,1,0,2\n" +
+ "null,Comment#12,18,2,1,0,1,0,2\n" +
+ "null,Comment#5,11,2,1,0,1,0,2\n" +
+ "null,Comment#15,21,2,1,0,1,0,2\n" +
+ "null,Comment#4,10,2,1,0,1,0,2\n" +
+ "null,Hi,1,2,1,0,1,0,2\n" +
+ "null,Comment#10,16,2,1,0,1,0,2\n" +
+ "null,Hello world,3,2,1,0,1,0,2\n" +
+ "null,I am fine.,5,2,1,0,1,0,2\n" +
+ "null,Hello world, how are you?,4,2,1,0,1,0,2\n" +
+ "null,Comment#9,15,2,1,0,1,0,2\n" +
+ "null,Comment#13,19,2,1,0,1,0,2\n" +
+ "null,Luke Skywalker,6,2,1,0,1,0,2\n" +
+ "null,Hello,2,2,1,0,1,0,2";
+
+ checkSql(query, expected);
+ }
+
+ @Test
+ public void testGroupingSetsWithNulls() throws Exception {
+ String query =
+ "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g FROM " + TABLE_WITH_NULLS_NAME +
+ " GROUP BY GROUPING SETS (f1, f2)";
+
+ String expected =
+ "6,null,18,1\n5,null,13,1\n4,null,8,1\n3,null,5,1\n2,null,2,1\n1,null,1,1\n" +
+ "null,Luke Skywalker,6,2\nnull,I am fine.,5,2\nnull,Hi,1,2\n" +
+ "null,null,3,2\nnull,Hello,2,2\nnull,Comment#9,15,2\nnull,Comment#8,14,2\n" +
+ "null,Comment#7,13,2\nnull,Comment#6,12,2\nnull,Comment#5,11,2\n" +
+ "null,Comment#4,10,2\nnull,Comment#3,9,2\nnull,Comment#2,8,2\n" +
+ "null,Comment#15,21,2\nnull,Comment#14,20,2\nnull,Comment#13,19,2\n" +
+ "null,Comment#12,18,2\nnull,Comment#11,17,2\nnull,Comment#10,16,2\n" +
+ "null,Comment#1,7,2";
+
+ checkSql(query, expected);
+ }
+
+ @Test
+ public void testCubeAsGroupingSets() throws Exception {
+ String cubeQuery =
+ "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " +
+ " GROUPING(f1) as gf1, GROUPING(f2) as gf2, " +
+ " GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " +
+ " GROUPING_ID(f1, f2) as gid " +
+ " FROM " + TABLE_NAME + " GROUP BY CUBE (f1, f2)";
+
+ String groupingSetsQuery =
+ "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " +
+ " GROUPING(f1) as gf1, GROUPING(f2) as gf2, " +
+ " GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " +
+ " GROUPING_ID(f1, f2) as gid " +
+ " FROM " + TABLE_NAME +
+ " GROUP BY GROUPING SETS ((f1, f2), (f1), (f2), ())";
+
+ compareSql(cubeQuery, groupingSetsQuery);
+ }
+
+ @Test
+ public void testRollupAsGroupingSets() throws Exception {
+ String rollupQuery =
+ "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " +
+ " GROUPING(f1) as gf1, GROUPING(f2) as gf2, " +
+ " GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " +
+ " GROUPING_ID(f1, f2) as gid " +
+ " FROM " + TABLE_NAME + " GROUP BY ROLLUP (f1, f2)";
+
+ String groupingSetsQuery =
+ "SELECT f1, f2, avg(f0) as a, GROUP_ID() as g, " +
+ " GROUPING(f1) as gf1, GROUPING(f2) as gf2, " +
+ " GROUPING_ID(f1) as gif1, GROUPING_ID(f2) as gif2, " +
+ " GROUPING_ID(f1, f2) as gid " +
+ " FROM " + TABLE_NAME +
+ " GROUP BY GROUPING SETS ((f1, f2), (f1), ())";
+
+ compareSql(rollupQuery, groupingSetsQuery);
+ }
+
+ /**
+ * Execute SQL query and check results.
+ *
+ * @param query SQL query.
+ * @param expected Expected result.
+ */
+ private void checkSql(String query, String expected) throws Exception {
+ Table resultTable = tableEnv.sql(query);
+ DataSet<Row> resultDataSet = tableEnv.toDataSet(resultTable, Row.class);
+ List<Row> results = resultDataSet.collect();
+ TestBaseUtils.compareResultAsText(results, expected);
+ }
+
+ private void compareSql(String query1, String query2) throws Exception {
+
+ // Function to map row to string
+ MapFunction<Row, String> mapFunction = new MapFunction<Row, String>() {
+
+ @Override
+ public String map(Row value) throws Exception {
+ return value == null ? "null" : value.toString();
+ }
+ };
+
+ // Execute first query and store results
+ Table resultTable1 = tableEnv.sql(query1);
+ DataSet<Row> resultDataSet1 = tableEnv.toDataSet(resultTable1, Row.class);
+ List<String> results1 = resultDataSet1.map(mapFunction).collect();
+
+ // Execute second query and store results
+ Table resultTable2 = tableEnv.sql(query2);
+ DataSet<Row> resultDataSet2 = tableEnv.toDataSet(resultTable2, Row.class);
+ List<String> results2 = resultDataSet2.map(mapFunction).collect();
+
+ // Compare results
+ TestBaseUtils.compareResultCollections(results1, results2, new Comparator<String>() {
+
+ @Override
+ public int compare(String o1, String o2) {
+ return o2 == null ? o1 == null ? 0 : 1 : o1.compareTo(o2);
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
new file mode 100644
index 0000000..c5a394a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.sql;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Integration tests for batch SQL.
+ */
+@RunWith(Parameterized.class)
+public class JavaSqlITCase extends TableProgramsCollectionTestBase {
+
+ public JavaSqlITCase(TableConfigMode configMode) {
+ super(configMode);
+ }
+
+ @Test
+ public void testValues() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ String sqlQuery = "VALUES (1, 'Test', TRUE, DATE '1944-02-24', 12.4444444444444445)," +
+ "(2, 'Hello', TRUE, DATE '1944-02-24', 12.666666665)," +
+ "(3, 'World', FALSE, DATE '1944-12-24', 12.54444445)";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+
+ List<Row> results = resultSet.collect();
+ String expected = "3,World,false,1944-12-24,12.5444444500000000\n" +
+ "2,Hello,true,1944-02-24,12.6666666650000000\n" +
+ // Calcite converts to decimals and strings with equal length
+ "1,Test ,true,1944-02-24,12.4444444444444445\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testSelectFromTable() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table in = tableEnv.fromDataSet(ds, "a,b,c");
+ tableEnv.registerTable("T", in);
+
+ String sqlQuery = "SELECT a, c FROM T";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,Hi\n" + "2,Hello\n" + "3,Hello world\n" +
+ "4,Hello world, how are you?\n" + "5,I am fine.\n" + "6,Luke Skywalker\n" +
+ "7,Comment#1\n" + "8,Comment#2\n" + "9,Comment#3\n" + "10,Comment#4\n" +
+ "11,Comment#5\n" + "12,Comment#6\n" + "13,Comment#7\n" +
+ "14,Comment#8\n" + "15,Comment#9\n" + "16,Comment#10\n" +
+ "17,Comment#11\n" + "18,Comment#12\n" + "19,Comment#13\n" +
+ "20,Comment#14\n" + "21,Comment#15\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testFilterFromDataSet() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ tableEnv.registerDataSet("DataSetTable", ds, "x, y, z");
+
+ String sqlQuery = "SELECT x FROM DataSetTable WHERE z LIKE '%Hello%'";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "2\n" + "3\n" + "4";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAggregation() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ tableEnv.registerDataSet("AggTable", ds, "x, y, z");
+
+ String sqlQuery = "SELECT sum(x), min(x), max(x), count(y), avg(x) FROM AggTable";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "231,1,21,21,11";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testJoin() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds1 = CollectionDataSets.getSmall3TupleDataSet(env);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.get5TupleDataSet(env);
+
+ tableEnv.registerDataSet("t1", ds1, "a, b, c");
+ tableEnv.registerDataSet("t2", ds2, "d, e, f, g, h");
+
+ String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "Hi,Hallo\n" + "Hello,Hallo Welt\n" + "Hello world,Hallo Welt\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testMap() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<Tuple2<Integer, Map<String, String>>> rows = new ArrayList<>();
+ rows.add(new Tuple2<>(1, Collections.singletonMap("foo", "bar")));
+ rows.add(new Tuple2<>(2, Collections.singletonMap("foo", "spam")));
+
+ TypeInformation<Tuple2<Integer, Map<String, String>>> ty = new TupleTypeInfo<>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO));
+
+ DataSet<Tuple2<Integer, Map<String, String>>> ds1 = env.fromCollection(rows, ty);
+ tableEnv.registerDataSet("t1", ds1, "a, b");
+
+ String sqlQuery = "SELECT b['foo'] FROM t1";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "bar\n" + "spam\n";
+ compareResultAsText(results, expected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
new file mode 100644
index 0000000..be7879a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/table/JavaTableEnvironmentITCase.java
@@ -0,0 +1,686 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.batch.table;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.GenericTypeInfo;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.calcite.CalciteConfig;
+import org.apache.flink.table.calcite.CalciteConfigBuilder;
+import org.apache.flink.table.runtime.utils.TableProgramsCollectionTestBase;
+import org.apache.flink.table.runtime.utils.TableProgramsTestBase;
+import org.apache.flink.test.operators.util.CollectionDataSets;
+import org.apache.flink.types.Row;
+
+import org.apache.calcite.tools.RuleSets;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Integration tests for {@link BatchTableEnvironment}.
+ */
+@RunWith(Parameterized.class)
+public class JavaTableEnvironmentITCase extends TableProgramsCollectionTestBase {
+
+ public JavaTableEnvironmentITCase(TableConfigMode configMode) {
+ super(configMode);
+ }
+
+ @Parameterized.Parameters(name = "Table config = {0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ { TableProgramsTestBase.DEFAULT() }
+ });
+ }
+
+ @Test
+ public void testSimpleRegister() throws Exception {
+ final String tableName = "MyTable";
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ tableEnv.registerDataSet(tableName, ds);
+ Table t = tableEnv.scan(tableName);
+
+ Table result = t.select("f0, f1");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,1\n" + "2,2\n" + "3,2\n" + "4,3\n" + "5,3\n" + "6,3\n" + "7,4\n" +
+ "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" + "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testRegisterWithFields() throws Exception {
+ final String tableName = "MyTable";
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ tableEnv.registerDataSet(tableName, ds, "a, b, c");
+ Table t = tableEnv.scan(tableName);
+
+ Table result = t.select("a, b, c");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+ "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+ "20,6,Comment#14\n" + "21,6,Comment#15\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = TableException.class)
+ public void testRegisterExistingDatasetTable() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ tableEnv.registerDataSet("MyTable", ds);
+ DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 =
+ CollectionDataSets.getSmall5TupleDataSet(env);
+ // Must fail. Name is already used for different table.
+ tableEnv.registerDataSet("MyTable", ds2);
+ }
+
+ @Test(expected = TableException.class)
+ public void testScanUnregisteredTable() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail. No table registered under that name.
+ tableEnv.scan("nonRegisteredTable");
+ }
+
+ @Test
+ public void testTableRegister() throws Exception {
+ final String tableName = "MyTable";
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table t = tableEnv.fromDataSet(ds);
+ tableEnv.registerTable(tableName, t);
+ Table result = tableEnv.scan(tableName).select("f0, f1").filter("f0 > 7");
+
+ DataSet<Row> resultSet = tableEnv.toDataSet(result, Row.class);
+ List<Row> results = resultSet.collect();
+ String expected = "8,4\n" + "9,4\n" + "10,4\n" + "11,5\n" + "12,5\n" +
+ "13,5\n" + "14,5\n" + "15,5\n" +
+ "16,6\n" + "17,6\n" + "18,6\n" + "19,6\n" + "20,6\n" + "21,6\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = TableException.class)
+ public void testIllegalName() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table t = tableEnv.fromDataSet(ds);
+ // Must fail. Table name matches internal name pattern.
+ tableEnv.registerTable("_DataSetTable_42", t);
+ }
+
+ @Test(expected = TableException.class)
+ public void testRegisterTableFromOtherEnv() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv1 = TableEnvironment.getTableEnvironment(env, config());
+ BatchTableEnvironment tableEnv2 = TableEnvironment.getTableEnvironment(env, config());
+
+ Table t = tableEnv1.fromDataSet(CollectionDataSets.get3TupleDataSet(env));
+ // Must fail. Table is bound to different TableEnvironment.
+ tableEnv2.registerTable("MyTable", t);
+ }
+
+ @Test
+ public void testAsFromTuple() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ Table table = tableEnv
+ .fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
+ .select("a, b, c");
+
+ DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+ List<Row> results = ds.collect();
+ String expected = "1,1,Hi\n" + "2,2,Hello\n" + "3,2,Hello world\n" +
+ "4,3,Hello world, how are you?\n" + "5,3,I am fine.\n" + "6,3,Luke Skywalker\n" +
+ "7,4,Comment#1\n" + "8,4,Comment#2\n" + "9,4,Comment#3\n" + "10,4,Comment#4\n" +
+ "11,5,Comment#5\n" + "12,5,Comment#6\n" + "13,5,Comment#7\n" +
+ "14,5,Comment#8\n" + "15,5,Comment#9\n" + "16,6,Comment#10\n" +
+ "17,6,Comment#11\n" + "18,6,Comment#12\n" + "19,6,Comment#13\n" +
+ "20,6,Comment#14\n" + "21,6,Comment#15\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsFromAndToTuple() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ Table table = tableEnv
+ .fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c")
+ .select("a, b, c");
+
+ TypeInformation<?> ti = new TupleTypeInfo<Tuple3<Integer, Long, String>>(
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ DataSet<?> ds = tableEnv.toDataSet(table, ti);
+ List<?> results = ds.collect();
+ String expected = "(1,1,Hi)\n" + "(2,2,Hello)\n" + "(3,2,Hello world)\n" +
+ "(4,3,Hello world, how are you?)\n" + "(5,3,I am fine.)\n" + "(6,3,Luke Skywalker)\n" +
+ "(7,4,Comment#1)\n" + "(8,4,Comment#2)\n" + "(9,4,Comment#3)\n" + "(10,4,Comment#4)\n" +
+ "(11,5,Comment#5)\n" + "(12,5,Comment#6)\n" + "(13,5,Comment#7)\n" +
+ "(14,5,Comment#8)\n" + "(15,5,Comment#9)\n" + "(16,6,Comment#10)\n" +
+ "(17,6,Comment#11)\n" + "(18,6,Comment#12)\n" + "(19,6,Comment#13)\n" +
+ "(20,6,Comment#14)\n" + "(21,6,Comment#15)\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Ignore
+ @Test
+ public void testAsFromTupleToPojo() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<Tuple4<String, Integer, Double, String>> data = new ArrayList<>();
+ data.add(new Tuple4<>("Rofl", 1, 1.0, "Hi"));
+ data.add(new Tuple4<>("lol", 2, 1.0, "Hi"));
+ data.add(new Tuple4<>("Test me", 4, 3.33, "Hello world"));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data), "q, w, e, r")
+ .select("q as a, w as b, e as c, r as d");
+
+ DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
+ List<SmallPojo2> results = ds.collect();
+ String expected = "Rofl,1,1.0,Hi\n" + "lol,2,1.0,Hi\n" + "Test me,4,3.33,Hello world\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsFromPojo() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<SmallPojo> data = new ArrayList<>();
+ data.add(new SmallPojo("Peter", 28, 4000.00, "Sales", new Integer[] {42}));
+ data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering", new Integer[] {}));
+ data.add(new SmallPojo("Lucy", 42, 6000.00, "HR", new Integer[] {1, 2, 3}));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data),
+ "department AS a, " +
+ "age AS b, " +
+ "salary AS c, " +
+ "name AS d," +
+ "roles as e")
+ .select("a, b, c, d, e");
+
+ DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+ List<Row> results = ds.collect();
+ String expected =
+ "Sales,28,4000.0,Peter,[42]\n" +
+ "Engineering,56,10000.0,Anna,[]\n" +
+ "HR,42,6000.0,Lucy,[1, 2, 3]\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsFromPrivateFieldsPojo() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<PrivateSmallPojo> data = new ArrayList<>();
+ data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
+ data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
+ data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data),
+ "department AS a, " +
+ "age AS b, " +
+ "salary AS c, " +
+ "name AS d")
+ .select("a, b, c, d");
+
+ DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+ List<Row> results = ds.collect();
+ String expected =
+ "Sales,28,4000.0,Peter\n" +
+ "Engineering,56,10000.0,Anna\n" +
+ "HR,42,6000.0,Lucy\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsFromAndToPojo() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<SmallPojo> data = new ArrayList<>();
+ data.add(new SmallPojo("Peter", 28, 4000.00, "Sales", new Integer[] {42}));
+ data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering", new Integer[] {}));
+ data.add(new SmallPojo("Lucy", 42, 6000.00, "HR", new Integer[] {1, 2, 3}));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data),
+ "department AS a, " +
+ "age AS b, " +
+ "salary AS c, " +
+ "name AS d," +
+ "roles AS e")
+ .select("a, b, c, d, e");
+
+ DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class);
+ List<SmallPojo2> results = ds.collect();
+ String expected =
+ "Sales,28,4000.0,Peter,[42]\n" +
+ "Engineering,56,10000.0,Anna,[]\n" +
+ "HR,42,6000.0,Lucy,[1, 2, 3]\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsFromAndToPrivateFieldPojo() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<PrivateSmallPojo> data = new ArrayList<>();
+ data.add(new PrivateSmallPojo("Peter", 28, 4000.00, "Sales"));
+ data.add(new PrivateSmallPojo("Anna", 56, 10000.00, "Engineering"));
+ data.add(new PrivateSmallPojo("Lucy", 42, 6000.00, "HR"));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data),
+ "department AS a, " +
+ "age AS b, " +
+ "salary AS c, " +
+ "name AS d")
+ .select("a, b, c, d");
+
+ DataSet<PrivateSmallPojo2> ds = tableEnv.toDataSet(table, PrivateSmallPojo2.class);
+ List<PrivateSmallPojo2> results = ds.collect();
+ String expected =
+ "Sales,28,4000.0,Peter\n" +
+ "Engineering,56,10000.0,Anna\n" +
+ "HR,42,6000.0,Lucy\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test
+ public void testAsWithPojoAndGenericTypes() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ List<PojoWithGeneric> data = new ArrayList<>();
+ data.add(new PojoWithGeneric("Peter", 28, new HashMap<String, String>(), new ArrayList<String>()));
+ HashMap<String, String> hm1 = new HashMap<>();
+ hm1.put("test1", "test1");
+ data.add(new PojoWithGeneric("Anna", 56, hm1, new ArrayList<String>()));
+ HashMap<String, String> hm2 = new HashMap<>();
+ hm2.put("abc", "cde");
+ data.add(new PojoWithGeneric("Lucy", 42, hm2, new ArrayList<String>()));
+
+ Table table = tableEnv
+ .fromDataSet(env.fromCollection(data),
+ "name AS a, " +
+ "age AS b, " +
+ "generic AS c, " +
+ "generic2 AS d")
+ .select("a, b, c, c as c2, d")
+ .select("a, b, c, c === c2, d");
+
+ DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
+ List<Row> results = ds.collect();
+ String expected =
+ "Peter,28,{},true,[]\n" +
+ "Anna,56,{test1=test1},true,[]\n" +
+ "Lucy,42,{abc=cde},true,[]\n";
+ compareResultAsText(results, expected);
+ }
+
+ @Test(expected = TableException.class)
+ public void testGenericRow() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // use null value the enforce GenericType
+ DataSet<Row> dataSet = env.fromElements(Row.of(1, 2L, "Hello", null));
+ assertTrue(dataSet.getType() instanceof GenericTypeInfo);
+ assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
+
+ // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+ tableEnv.fromDataSet(dataSet);
+ }
+
+ @Test(expected = TableException.class)
+ public void testGenericRowWithAlias() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // use null value the enforce GenericType
+ DataSet<Row> dataSet = env.fromElements(Row.of((Integer) null));
+ assertTrue(dataSet.getType() instanceof GenericTypeInfo);
+ assertTrue(dataSet.getType().getTypeClass().equals(Row.class));
+
+ // Must fail. Cannot import DataSet<Row> with GenericTypeInfo.
+ tableEnv.fromDataSet(dataSet, "nullField");
+ }
+
+ @Test(expected = TableException.class)
+ public void testAsWithToManyFields() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail. Too many field names specified.
+ tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, c, d");
+ }
+
+ @Test(expected = TableException.class)
+ public void testAsWithAmbiguousFields() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail. Specified field names are not unique.
+ tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a, b, b");
+ }
+
+ @Test(expected = TableException.class)
+ public void testAsWithNonFieldReference1() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail. as() does only allow field name expressions
+ tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a + 1, b, c");
+ }
+
+ @Test(expected = TableException.class)
+ public void testAsWithNonFieldReference2() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail. as() does only allow field name expressions
+ tableEnv.fromDataSet(CollectionDataSets.get3TupleDataSet(env), "a as foo, b, c");
+ }
+
+ @Test(expected = TableException.class)
+ public void testNonStaticClassInput() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail since class is not static
+ tableEnv.fromDataSet(env.fromElements(new MyNonStatic()), "name");
+ }
+
+ @Test(expected = TableException.class)
+ public void testNonStaticClassOutput() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ // Must fail since class is not static
+ Table t = tableEnv.fromDataSet(env.fromElements(1, 2, 3), "number");
+ tableEnv.toDataSet(t, MyNonStatic.class);
+ }
+
+ @Test(expected = TableException.class)
+ public void testCustomCalciteConfig() {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config());
+
+ CalciteConfig cc = new CalciteConfigBuilder()
+ .replaceLogicalOptRuleSet(RuleSets.ofList())
+ .replacePhysicalOptRuleSet(RuleSets.ofList())
+ .build();
+ tableEnv.getConfig().setCalciteConfig(cc);
+
+ DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
+ Table t = tableEnv.fromDataSet(ds);
+ tableEnv.toDataSet(t, Row.class);
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Non-static class.
+ */
+ public class MyNonStatic {
+ public int number;
+ }
+
+ /**
+ * Small POJO.
+ */
+ @SuppressWarnings("unused")
+ public static class SmallPojo {
+
+ public SmallPojo() { }
+
+ public SmallPojo(String name, int age, double salary, String department, Integer[] roles) {
+ this.name = name;
+ this.age = age;
+ this.salary = salary;
+ this.department = department;
+ this.roles = roles;
+ }
+
+ public String name;
+ public int age;
+ public double salary;
+ public String department;
+ public Integer[] roles;
+ }
+
+ /**
+ * POJO with generic fields.
+ */
+ @SuppressWarnings("unused")
+ public static class PojoWithGeneric {
+ public String name;
+ public int age;
+ public HashMap<String, String> generic;
+ public ArrayList<String> generic2;
+
+ public PojoWithGeneric() {
+ // default constructor
+ }
+
+ public PojoWithGeneric(String name, int age, HashMap<String, String> generic,
+ ArrayList<String> generic2) {
+ this.name = name;
+ this.age = age;
+ this.generic = generic;
+ this.generic2 = generic2;
+ }
+
+ @Override
+ public String toString() {
+ return name + "," + age + "," + generic + "," + generic2;
+ }
+ }
+
+ /**
+ * Small POJO with private fields.
+ */
+ @SuppressWarnings("unused")
+ public static class PrivateSmallPojo {
+
+ public PrivateSmallPojo() { }
+
+ public PrivateSmallPojo(String name, int age, double salary, String department) {
+ this.name = name;
+ this.age = age;
+ this.salary = salary;
+ this.department = department;
+ }
+
+ private String name;
+ private int age;
+ private double salary;
+ private String department;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ public double getSalary() {
+ return salary;
+ }
+
+ public void setSalary(double salary) {
+ this.salary = salary;
+ }
+
+ public String getDepartment() {
+ return department;
+ }
+
+ public void setDepartment(String department) {
+ this.department = department;
+ }
+ }
+
+ /**
+ * Another small POJO.
+ */
+ @SuppressWarnings("unused")
+ public static class SmallPojo2 {
+
+ public SmallPojo2() { }
+
+ public SmallPojo2(String a, int b, double c, String d, Integer[] e) {
+ this.a = a;
+ this.b = b;
+ this.c = c;
+ this.d = d;
+ this.e = e;
+ }
+
+ public String a;
+ public int b;
+ public double c;
+ public String d;
+ public Integer[] e;
+
+ @Override
+ public String toString() {
+ return a + "," + b + "," + c + "," + d + "," + Arrays.toString(e);
+ }
+ }
+
+ /**
+ * Another small POJO with private fields.
+ */
+ @SuppressWarnings("unused")
+ public static class PrivateSmallPojo2 {
+
+ public PrivateSmallPojo2() { }
+
+ public PrivateSmallPojo2(String a, int b, double c, String d) {
+ this.a = a;
+ this.b = b;
+ this.c = c;
+ this.d = d;
+ }
+
+ private String a;
+ private int b;
+ private double c;
+ private String d;
+
+ public String getA() {
+ return a;
+ }
+
+ public void setA(String a) {
+ this.a = a;
+ }
+
+ public int getB() {
+ return b;
+ }
+
+ public void setB(int b) {
+ this.b = b;
+ }
+
+ public double getC() {
+ return c;
+ }
+
+ public void setC(double c) {
+ this.c = c;
+ }
+
+ public String getD() {
+ return d;
+ }
+
+ public void setD(String d) {
+ this.d = d;
+ }
+
+ @Override
+ public String toString() {
+ return a + "," + b + "," + c + "," + d;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
new file mode 100644
index 0000000..c6368d4
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/stream/sql/JavaSqlITCase.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.sql;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.runtime.utils.JavaStreamTestData;
+import org.apache.flink.table.runtime.utils.StreamITCase;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Integration tests for streaming SQL.
+ */
+public class JavaSqlITCase extends StreamingMultipleProgramsTestBase {
+
+ @Test
+ public void testRowRegisterRowWithNames() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ StreamITCase.clear();
+
+ List<Row> data = new ArrayList<>();
+ data.add(Row.of(1, 1L, "Hi"));
+ data.add(Row.of(2, 2L, "Hello"));
+ data.add(Row.of(3, 2L, "Hello world"));
+
+ TypeInformation<?>[] types = {
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO};
+ String[] names = {"a", "b", "c"};
+
+ RowTypeInfo typeInfo = new RowTypeInfo(types, names);
+
+ DataStream<Row> ds = env.fromCollection(data).returns(typeInfo);
+
+ Table in = tableEnv.fromDataStream(ds, "a,b,c");
+ tableEnv.registerTable("MyTableRow", in);
+
+ String sqlQuery = "SELECT a,c FROM MyTableRow";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink<Row>());
+ env.execute();
+
+ List<String> expected = new ArrayList<>();
+ expected.add("1,Hi");
+ expected.add("2,Hello");
+ expected.add("3,Hello world");
+
+ StreamITCase.compareWithList(expected);
+ }
+
+ @Test
+ public void testSelect() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ StreamITCase.clear();
+
+ DataStream<Tuple3<Integer, Long, String>> ds = JavaStreamTestData.getSmall3TupleDataSet(env);
+ Table in = tableEnv.fromDataStream(ds, "a,b,c");
+ tableEnv.registerTable("MyTable", in);
+
+ String sqlQuery = "SELECT * FROM MyTable";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink<Row>());
+ env.execute();
+
+ List<String> expected = new ArrayList<>();
+ expected.add("1,1,Hi");
+ expected.add("2,2,Hello");
+ expected.add("3,2,Hello world");
+
+ StreamITCase.compareWithList(expected);
+ }
+
+ @Test
+ public void testFilter() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ StreamITCase.clear();
+
+ DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds = JavaStreamTestData.get5TupleDataStream(env);
+ tableEnv.registerDataStream("MyTable", ds, "a, b, c, d, e");
+
+ String sqlQuery = "SELECT a, b, e FROM MyTable WHERE c < 4";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink<Row>());
+ env.execute();
+
+ List<String> expected = new ArrayList<>();
+ expected.add("1,1,1");
+ expected.add("2,2,2");
+ expected.add("2,3,1");
+ expected.add("3,4,2");
+
+ StreamITCase.compareWithList(expected);
+ }
+
+ @Test
+ public void testUnion() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
+ StreamITCase.clear();
+
+ DataStream<Tuple3<Integer, Long, String>> ds1 = JavaStreamTestData.getSmall3TupleDataSet(env);
+ Table t1 = tableEnv.fromDataStream(ds1, "a,b,c");
+ tableEnv.registerTable("T1", t1);
+
+ DataStream<Tuple5<Integer, Long, Integer, String, Long>> ds2 = JavaStreamTestData.get5TupleDataStream(env);
+ tableEnv.registerDataStream("T2", ds2, "a, b, d, c, e");
+
+ String sqlQuery = "SELECT * FROM T1 " +
+ "UNION ALL " +
+ "(SELECT a, b, c FROM T2 WHERE a < 3)";
+ Table result = tableEnv.sql(sqlQuery);
+
+ DataStream<Row> resultSet = tableEnv.toAppendStream(result, Row.class);
+ resultSet.addSink(new StreamITCase.StringSink<Row>());
+ env.execute();
+
+ List<String> expected = new ArrayList<>();
+ expected.add("1,1,Hi");
+ expected.add("2,2,Hello");
+ expected.add("3,2,Hello world");
+ expected.add("1,1,Hallo");
+ expected.add("2,2,Hallo Welt");
+ expected.add("2,3,Hallo Welt wie");
+
+ StreamITCase.compareWithList(expected);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaPojos.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaPojos.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaPojos.java
new file mode 100644
index 0000000..6221834
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaPojos.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils;
+
+import java.io.Serializable;
+import java.sql.Timestamp;
+
+/**
+ * POJOs for Table API testing.
+ */
+public class JavaPojos {
+
+ /**
+ * Pojo1 for test.
+ */
+ public static class Pojo1 implements Serializable {
+
+ public Timestamp ts;
+ public String msg;
+
+ @Override
+ public String toString() {
+ return "Pojo1{" +
+ "ts=" + ts +
+ ", msg='" + msg + '\'' +
+ '}';
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaStreamTestData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaStreamTestData.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaStreamTestData.java
new file mode 100644
index 0000000..b42c119
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaStreamTestData.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Test data.
+ */
+public class JavaStreamTestData {
+
+ public static DataStream<Tuple3<Integer, Long, String>> getSmall3TupleDataSet(StreamExecutionEnvironment env) {
+
+ List<Tuple3<Integer, Long, String>> data = new ArrayList<>();
+ data.add(new Tuple3<>(1, 1L, "Hi"));
+ data.add(new Tuple3<>(2, 2L, "Hello"));
+ data.add(new Tuple3<>(3, 2L, "Hello world"));
+
+ Collections.shuffle(data);
+
+ return env.fromCollection(data);
+ }
+
+ public static DataStream<Tuple5<Integer, Long, Integer, String, Long>> get5TupleDataStream(StreamExecutionEnvironment env) {
+
+ List<Tuple5<Integer, Long, Integer, String, Long>> data = new ArrayList<>();
+ data.add(new Tuple5<>(1, 1L, 0, "Hallo", 1L));
+ data.add(new Tuple5<>(2, 2L, 1, "Hallo Welt", 2L));
+ data.add(new Tuple5<>(2, 3L, 2, "Hallo Welt wie", 1L));
+ data.add(new Tuple5<>(3, 4L, 3, "Hallo Welt wie gehts?", 2L));
+ data.add(new Tuple5<>(3, 5L, 4, "ABC", 2L));
+ data.add(new Tuple5<>(3, 6L, 5, "BCD", 3L));
+ data.add(new Tuple5<>(4, 7L, 6, "CDE", 2L));
+ data.add(new Tuple5<>(4, 8L, 7, "DEF", 1L));
+ data.add(new Tuple5<>(4, 9L, 8, "EFG", 1L));
+ data.add(new Tuple5<>(4, 10L, 9, "FGH", 2L));
+ data.add(new Tuple5<>(5, 11L, 10, "GHI", 1L));
+ data.add(new Tuple5<>(5, 12L, 11, "HIJ", 3L));
+ data.add(new Tuple5<>(5, 13L, 12, "IJK", 3L));
+ data.add(new Tuple5<>(5, 15L, 14, "KLM", 2L));
+ data.add(new Tuple5<>(5, 14L, 13, "JKL", 2L));
+ return env.fromCollection(data);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
new file mode 100644
index 0000000..4d06bc2
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.functions.AggregateFunction;
+
+import java.util.Iterator;
+
+/**
+ * Test aggregator functions.
+ */
+public class JavaUserDefinedAggFunctions {
+ /**
+ * Accumulator for test requiresOver.
+ */
+ public static class Accumulator0 extends Tuple2<Long, Integer>{}
+
+ /**
+ * Test for requiresOver.
+ */
+ public static class OverAgg0 extends AggregateFunction<Long, Accumulator0> {
+ @Override
+ public Accumulator0 createAccumulator() {
+ return new Accumulator0();
+ }
+
+ @Override
+ public Long getValue(Accumulator0 accumulator) {
+ return 1L;
+ }
+
+ //Overloaded accumulate method
+ public void accumulate(Accumulator0 accumulator, long iValue, int iWeight) {
+ }
+
+ @Override
+ public boolean requiresOver() {
+ return true;
+ }
+ }
+
+ /**
+ * Accumulator for WeightedAvg.
+ */
+ public static class WeightedAvgAccum {
+ public long sum = 0;
+ public int count = 0;
+ }
+
+ /**
+ * Base class for WeightedAvg.
+ */
+ public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccum> {
+ @Override
+ public WeightedAvgAccum createAccumulator() {
+ return new WeightedAvgAccum();
+ }
+
+ @Override
+ public Long getValue(WeightedAvgAccum accumulator) {
+ if (accumulator.count == 0) {
+ return null;
+ } else {
+ return accumulator.sum / accumulator.count;
+ }
+ }
+
+ // overloaded accumulate method
+ public void accumulate(WeightedAvgAccum accumulator, long iValue, int iWeight) {
+ accumulator.sum += iValue * iWeight;
+ accumulator.count += iWeight;
+ }
+
+ //Overloaded accumulate method
+ public void accumulate(WeightedAvgAccum accumulator, int iValue, int iWeight) {
+ accumulator.sum += iValue * iWeight;
+ accumulator.count += iWeight;
+ }
+ }
+
+ /**
+ * A WeightedAvg class with merge method.
+ */
+ public static class WeightedAvgWithMerge extends WeightedAvg {
+ public void merge(WeightedAvgAccum acc, Iterable<WeightedAvgAccum> it) {
+ Iterator<WeightedAvgAccum> iter = it.iterator();
+ while (iter.hasNext()) {
+ WeightedAvgAccum a = iter.next();
+ acc.count += a.count;
+ acc.sum += a.sum;
+ }
+ }
+ }
+
+ /**
+ * A WeightedAvg class with merge and reset method.
+ */
+ public static class WeightedAvgWithMergeAndReset extends WeightedAvgWithMerge {
+ public void resetAccumulator(WeightedAvgAccum acc) {
+ acc.count = 0;
+ acc.sum = 0L;
+ }
+ }
+
+ /**
+ * A WeightedAvg class with retract method.
+ */
+ public static class WeightedAvgWithRetract extends WeightedAvg {
+ //Overloaded retract method
+ public void retract(WeightedAvgAccum accumulator, long iValue, int iWeight) {
+ accumulator.sum -= iValue * iWeight;
+ accumulator.count -= iWeight;
+ }
+
+ //Overloaded retract method
+ public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) {
+ accumulator.sum -= iValue * iWeight;
+ accumulator.count -= iWeight;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedScalarFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedScalarFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedScalarFunctions.java
new file mode 100644
index 0000000..a77ad9a
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedScalarFunctions.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils;
+
+import org.apache.flink.table.functions.ScalarFunction;
+
+import java.util.Arrays;
+
+/**
+ * Test scalar functions.
+ */
+public class JavaUserDefinedScalarFunctions {
+
+ /**
+ * Increment input.
+ */
+ public static class JavaFunc0 extends ScalarFunction {
+ public long eval(Long l) {
+ return l + 1;
+ }
+ }
+
+ /**
+ * Concatenate inputs as strings.
+ */
+ public static class JavaFunc1 extends ScalarFunction {
+ public String eval(Integer a, int b, Long c) {
+ return a + " and " + b + " and " + c;
+ }
+ }
+
+ /**
+ * Append product to string.
+ */
+ public static class JavaFunc2 extends ScalarFunction {
+ public String eval(String s, Integer... a) {
+ int m = 1;
+ for (int n : a) {
+ m *= n;
+ }
+ return s + m;
+ }
+ }
+
+ /**
+ * Test overloading.
+ */
+ public static class JavaFunc3 extends ScalarFunction {
+ public int eval(String a, int... b) {
+ return b.length;
+ }
+
+ public String eval(String c) {
+ return c;
+ }
+ }
+
+ /**
+ * Concatenate arrays as strings.
+ */
+ public static class JavaFunc4 extends ScalarFunction {
+ public String eval(Integer[] a, String[] b) {
+ return Arrays.toString(a) + " and " + Arrays.toString(b);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
new file mode 100644
index 0000000..cd92f49
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedTableFunctions.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.utils;
+
+import org.apache.flink.table.functions.TableFunction;
+
+/**
+ * Test functions.
+ */
+public class JavaUserDefinedTableFunctions {
+
+ /**
+ * Emit inputs as long.
+ */
+ public static class JavaTableFunc0 extends TableFunction<Long> {
+ public void eval(Integer a, Long b, Long c) {
+ collect(a.longValue());
+ collect(b);
+ collect(c);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
deleted file mode 100644
index 9bc3e51..0000000
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/ExpressionReductionTest.scala
+++ /dev/null
@@ -1,509 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.table
-
-import org.apache.flink.api.scala._
-import org.apache.flink.table.api.Types
-import org.apache.flink.table.api.scala._
-import org.apache.flink.table.functions.ScalarFunction
-import org.apache.flink.table.utils.TableTestBase
-import org.apache.flink.table.utils.TableTestUtil._
-import org.junit.{Ignore, Test}
-
-class ExpressionReductionTest extends TableTestBase {
-
- @Test
- def testReduceCalcExpressionForBatchSQL(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "(3+4)+a, " +
- "b+(1+2), " +
- "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
- "TRIM(BOTH ' STRING '), " +
- "'test' || 'string', " +
- "NULLIF(1, 1), " +
- "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
- "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
- "1 IS NULL, " +
- "'TEST' LIKE '%EST', " +
- "FLOOR(2.5), " +
- "'TEST' IN ('west', 'TEST', 'rest'), " +
- "CAST(TRUE AS VARCHAR) || 'X'" +
- "FROM MyTable WHERE a>(1+7)"
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select",
- "+(7, a) AS EXPR$0",
- "+(b, 3) AS EXPR$1",
- "'b' AS EXPR$2",
- "'STRING' AS EXPR$3",
- "'teststring' AS EXPR$4",
- "null AS EXPR$5",
- "1990-10-24 23:00:01.123 AS EXPR$6",
- "19 AS EXPR$7",
- "false AS EXPR$8",
- "true AS EXPR$9",
- "2 AS EXPR$10",
- "true AS EXPR$11",
- "'trueX' AS EXPR$12"
- ),
- term("where", ">(a, 8)")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testReduceProjectExpressionForBatchSQL(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "(3+4)+a, " +
- "b+(1+2), " +
- "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
- "TRIM(BOTH ' STRING '), " +
- "'test' || 'string', " +
- "NULLIF(1, 1), " +
- "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
- "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
- "1 IS NULL, " +
- "'TEST' LIKE '%EST', " +
- "FLOOR(2.5), " +
- "'TEST' IN ('west', 'TEST', 'rest'), " +
- "CAST(TRUE AS VARCHAR) || 'X'" +
- "FROM MyTable"
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select",
- "+(7, a) AS EXPR$0",
- "+(b, 3) AS EXPR$1",
- "'b' AS EXPR$2",
- "'STRING' AS EXPR$3",
- "'teststring' AS EXPR$4",
- "null AS EXPR$5",
- "1990-10-24 23:00:01.123 AS EXPR$6",
- "19 AS EXPR$7",
- "false AS EXPR$8",
- "true AS EXPR$9",
- "2 AS EXPR$10",
- "true AS EXPR$11",
- "'trueX' AS EXPR$12"
- )
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testReduceFilterExpressionForBatchSQL(): Unit = {
- val util = batchTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "*" +
- "FROM MyTable WHERE a>(1+7)"
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "b", "c"),
- term("where", ">(a, 8)")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testReduceCalcExpressionForBatchTableAPI(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .where('a > (1 + 7))
- .select((3 + 4).toExpr + 6,
- (11 === 1) ? ("a", "b"),
- " STRING ".trim,
- "test" + "string",
- "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
- 1.isNull,
- "TEST".like("%EST"),
- 2.5.toExpr.floor(),
- true.cast(Types.STRING) + "X")
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select",
- "13 AS _c0",
- "'b' AS _c1",
- "'STRING' AS _c2",
- "'teststring' AS _c3",
- "1990-10-24 23:00:01.123 AS _c4",
- "false AS _c5",
- "true AS _c6",
- "2E0 AS _c7",
- "'trueX' AS _c8"
- ),
- term("where", ">(a, 8)")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testReduceProjectExpressionForBatchTableAPI(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .select((3 + 4).toExpr + 6,
- (11 === 1) ? ("a", "b"),
- " STRING ".trim,
- "test" + "string",
- "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
- 1.isNull,
- "TEST".like("%EST"),
- 2.5.toExpr.floor(),
- true.cast(Types.STRING) + "X")
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select",
- "13 AS _c0",
- "'b' AS _c1",
- "'STRING' AS _c2",
- "'teststring' AS _c3",
- "1990-10-24 23:00:01.123 AS _c4",
- "false AS _c5",
- "true AS _c6",
- "2E0 AS _c7",
- "'trueX' AS _c8"
- )
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testReduceFilterExpressionForBatchTableAPI(): Unit = {
- val util = batchTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .where('a > (1 + 7))
-
- val expected = unaryNode(
- "DataSetCalc",
- batchTableNode(0),
- term("select", "a", "b", "c"),
- term("where", ">(a, 8)")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testReduceCalcExpressionForStreamSQL(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "(3+4)+a, " +
- "b+(1+2), " +
- "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
- "TRIM(BOTH ' STRING '), " +
- "'test' || 'string', " +
- "NULLIF(1, 1), " +
- "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
- "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
- "1 IS NULL, " +
- "'TEST' LIKE '%EST', " +
- "FLOOR(2.5), " +
- "'TEST' IN ('west', 'TEST', 'rest'), " +
- "CAST(TRUE AS VARCHAR) || 'X'" +
- "FROM MyTable WHERE a>(1+7)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select",
- "+(7, a) AS EXPR$0",
- "+(b, 3) AS EXPR$1",
- "'b' AS EXPR$2",
- "'STRING' AS EXPR$3",
- "'teststring' AS EXPR$4",
- "null AS EXPR$5",
- "1990-10-24 23:00:01.123 AS EXPR$6",
- "19 AS EXPR$7",
- "false AS EXPR$8",
- "true AS EXPR$9",
- "2 AS EXPR$10",
- "true AS EXPR$11",
- "'trueX' AS EXPR$12"
- ),
- term("where", ">(a, 8)")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testReduceProjectExpressionForStreamSQL(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "(3+4)+a, " +
- "b+(1+2), " +
- "CASE 11 WHEN 1 THEN 'a' ELSE 'b' END, " +
- "TRIM(BOTH ' STRING '), " +
- "'test' || 'string', " +
- "NULLIF(1, 1), " +
- "TIMESTAMP '1990-10-14 23:00:00.123' + INTERVAL '10 00:00:01' DAY TO SECOND, " +
- "EXTRACT(DAY FROM INTERVAL '19 12:10:10.123' DAY TO SECOND(3)), " +
- "1 IS NULL, " +
- "'TEST' LIKE '%EST', " +
- "FLOOR(2.5), " +
- "'TEST' IN ('west', 'TEST', 'rest'), " +
- "CAST(TRUE AS VARCHAR) || 'X'" +
- "FROM MyTable"
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select",
- "+(7, a) AS EXPR$0",
- "+(b, 3) AS EXPR$1",
- "'b' AS EXPR$2",
- "'STRING' AS EXPR$3",
- "'teststring' AS EXPR$4",
- "null AS EXPR$5",
- "1990-10-24 23:00:01.123 AS EXPR$6",
- "19 AS EXPR$7",
- "false AS EXPR$8",
- "true AS EXPR$9",
- "2 AS EXPR$10",
- "true AS EXPR$11",
- "'trueX' AS EXPR$12"
- )
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testReduceFilterExpressionForStreamSQL(): Unit = {
- val util = streamTestUtil()
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val sqlQuery = "SELECT " +
- "*" +
- "FROM MyTable WHERE a>(1+7)"
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "b", "c"),
- term("where", ">(a, 8)")
- )
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testReduceCalcExpressionForStreamTableAPI(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .where('a > (1 + 7))
- .select((3 + 4).toExpr + 6,
- (11 === 1) ? ("a", "b"),
- " STRING ".trim,
- "test" + "string",
- "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
- 1.isNull,
- "TEST".like("%EST"),
- 2.5.toExpr.floor(),
- true.cast(Types.STRING) + "X")
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select",
- "13 AS _c0",
- "'b' AS _c1",
- "'STRING' AS _c2",
- "'teststring' AS _c3",
- "1990-10-24 23:00:01.123 AS _c4",
- "false AS _c5",
- "true AS _c6",
- "2E0 AS _c7",
- "'trueX' AS _c8"
- ),
- term("where", ">(a, 8)")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testReduceProjectExpressionForStreamTableAPI(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .select((3 + 4).toExpr + 6,
- (11 === 1) ? ("a", "b"),
- " STRING ".trim,
- "test" + "string",
- "1990-10-14 23:00:00.123".toTimestamp + 10.days + 1.second,
- 1.isNull,
- "TEST".like("%EST"),
- 2.5.toExpr.floor(),
- true.cast(Types.STRING) + "X")
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select",
- "13 AS _c0",
- "'b' AS _c1",
- "'STRING' AS _c2",
- "'teststring' AS _c3",
- "1990-10-24 23:00:01.123 AS _c4",
- "false AS _c5",
- "true AS _c6",
- "2E0 AS _c7",
- "'trueX' AS _c8"
- )
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testReduceFilterExpressionForStreamTableAPI(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .where('a > (1 + 7))
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "b", "c"),
- term("where", ">(a, 8)")
- )
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testNestedTablesReductionStream(): Unit = {
- val util = streamTestUtil()
-
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
-
- util.tableEnv.registerTable("NewTable", newTable)
-
- val sqlQuery = "SELECT a FROM NewTable"
-
- // 1+1 should be normalized to 2
- val expected = unaryNode("DataStreamCalc", streamTableNode(0), term("select", "+(2, a) AS a"))
-
- util.verifySql(sqlQuery, expected)
- }
-
- @Test
- def testNestedTablesReductionBatch(): Unit = {
- val util = batchTestUtil()
-
- util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val newTable = util.tableEnv.sql("SELECT 1 + 1 + a AS a FROM MyTable")
-
- util.tableEnv.registerTable("NewTable", newTable)
-
- val sqlQuery = "SELECT a FROM NewTable"
-
- // 1+1 should be normalized to 2
- val expected = unaryNode("DataSetCalc", batchTableNode(0), term("select", "+(2, a) AS a"))
-
- util.verifySql(sqlQuery, expected)
- }
-
- // todo this NPE is caused by Calcite, it shall pass when [CALCITE-1860] is fixed
- @Ignore
- def testReduceDeterministicUDF(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- // if isDeterministic = true, will cause a Calcite NPE, which will be fixed in [CALCITE-1860]
- val result = table
- .select('a, 'b, 'c, DeterministicNullFunc() as 'd)
- .where("d.isNull")
- .select('a, 'b, 'c)
-
- val expected: String = streamTableNode(0)
-
- util.verifyTable(result, expected)
- }
-
- @Test
- def testReduceNonDeterministicUDF(): Unit = {
- val util = streamTestUtil()
- val table = util.addTable[(Int, Long, String)]("MyTable", 'a, 'b, 'c)
-
- val result = table
- .select('a, 'b, 'c, NonDeterministicNullFunc() as 'd)
- .where("d.isNull")
- .select('a, 'b, 'c)
-
- val expected = unaryNode(
- "DataStreamCalc",
- streamTableNode(0),
- term("select", "a", "b", "c"),
- term("where", s"IS NULL(${NonDeterministicNullFunc.functionIdentifier}())")
- )
-
- util.verifyTable(result, expected)
- }
-
-}
-
-object NonDeterministicNullFunc extends ScalarFunction {
- def eval(): String = null
- override def isDeterministic = false
-}
-
-object DeterministicNullFunc extends ScalarFunction {
- def eval(): String = null
- override def isDeterministic = true
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f1fafc0e/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
new file mode 100644
index 0000000..a2356a6
--- /dev/null
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/ExternalCatalogTest.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api
+
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.runtime.utils.CommonTestData
+import org.apache.flink.table.utils.TableTestUtil._
+import org.apache.flink.table.utils.TableTestBase
+import org.junit.Test
+
+/**
+ * Test for external catalog query plan.
+ */
+class ExternalCatalogTest extends TableTestBase {
+ private val table1Path: Array[String] = Array("test", "db1", "tb1")
+ private val table1TopLevelPath: Array[String] = Array("test", "tb1")
+ private val table1ProjectedFields: Array[String] = Array("a", "b", "c")
+ private val table2Path: Array[String] = Array("test", "db2", "tb2")
+ private val table2ProjectedFields: Array[String] = Array("d", "e", "g")
+
+ @Test
+ def testBatchTableApi(): Unit = {
+ val util = batchTestUtil()
+ val tableEnv = util.tableEnv
+
+ tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val table1 = tableEnv.scan("test", "db1", "tb1")
+ val table2 = tableEnv.scan("test", "db2", "tb2")
+ val result = table2
+ .select('d * 2, 'e, 'g.upperCase())
+ .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table1Path, table1ProjectedFields),
+ term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+ ),
+ term("union", "_c0", "e", "_c2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testBatchSQL(): Unit = {
+ val util = batchTestUtil()
+
+ util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
+ "(SELECT a * 2, b, c FROM test.db1.tb1)"
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS EXPR$0", "e", "g"),
+ term("where", "<(d, 3)")),
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table1Path, table1ProjectedFields),
+ term("select", "*(a, 2) AS EXPR$0", "b", "c")
+ ),
+ term("union", "EXPR$0", "e", "g"))
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testStreamTableApi(): Unit = {
+ val util = streamTestUtil()
+ val tableEnv = util.tableEnv
+
+ util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val table1 = tableEnv.scan("test", "db1", "tb1")
+ val table2 = tableEnv.scan("test", "db2", "tb2")
+
+ val result = table2.where("d < 3")
+ .select('d * 2, 'e, 'g.upperCase())
+ .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+ val expected = binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2"),
+ term("where", "<(d, 3)")
+ ),
+ unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(table1Path, table1ProjectedFields),
+ term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+ ),
+ term("union all", "_c0", "e", "_c2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ @Test
+ def testStreamSQL(): Unit = {
+ val util = streamTestUtil()
+
+ util.tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val sqlQuery = "SELECT d * 2, e, g FROM test.db2.tb2 WHERE d < 3 UNION ALL " +
+ "(SELECT a * 2, b, c FROM test.db1.tb1)"
+
+ val expected = binaryNode(
+ "DataStreamUnion",
+ unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS EXPR$0", "e", "g"),
+ term("where", "<(d, 3)")),
+ unaryNode(
+ "DataStreamCalc",
+ sourceStreamTableNode(table1Path, table1ProjectedFields),
+ term("select", "*(a, 2) AS EXPR$0", "b", "c")
+ ),
+ term("union all", "EXPR$0", "e", "g"))
+
+ util.verifySql(sqlQuery, expected)
+ }
+
+ @Test
+ def testTopLevelTable(): Unit = {
+ val util = batchTestUtil()
+ val tableEnv = util.tableEnv
+
+ tableEnv.registerExternalCatalog("test", CommonTestData.getInMemoryTestCatalog)
+
+ val table1 = tableEnv.scan("test", "tb1")
+ val table2 = tableEnv.scan("test", "db2", "tb2")
+ val result = table2
+ .select('d * 2, 'e, 'g.upperCase())
+ .unionAll(table1.select('a * 2, 'b, 'c.upperCase()))
+
+ val expected = binaryNode(
+ "DataSetUnion",
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table2Path, table2ProjectedFields),
+ term("select", "*(d, 2) AS _c0", "e", "UPPER(g) AS _c2")
+ ),
+ unaryNode(
+ "DataSetCalc",
+ sourceBatchTableNode(table1TopLevelPath, table1ProjectedFields),
+ term("select", "*(a, 2) AS _c0", "b", "UPPER(c) AS _c2")
+ ),
+ term("union", "_c0", "e", "_c2")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
+ def sourceBatchTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
+ s"BatchTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
+ s"fields=[${fields.mkString(", ")}])"
+ }
+
+ def sourceStreamTableNode(sourceTablePath: Array[String], fields: Array[String]): String = {
+ s"StreamTableSourceScan(table=[[${sourceTablePath.mkString(", ")}]], " +
+ s"fields=[${fields.mkString(", ")}])"
+ }
+}