You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/09 14:31:04 UTC
[flink] branch master updated: [FLINK-12951][table-planner] Add
logic to bridge DDL with table source/sink
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 77e54cb [FLINK-12951][table-planner] Add logic to bridge DDL with table source/sink
77e54cb is described below
commit 77e54cbf9e2f0ae2bfb59bb0ea54793606134c49
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Wed Jun 5 19:11:54 2019 +0800
[FLINK-12951][table-planner] Add logic to bridge DDL with table source/sink
This closes #8844
---
flink-python/pyflink/table/table_environment.py | 63 +++
.../flink/sql/parser/ddl/SqlCreateTable.java | 4 +
.../apache/flink/table/api/TableEnvironment.java | 71 ++++
.../table/api/internal/TableEnvironmentImpl.java | 78 +++-
.../apache/flink/table/catalog/CatalogManager.java | 42 ++
.../flink/table/catalog/CatalogTableImpl.java | 5 +-
.../table/operations/ddl/CreateOperation.java | 34 ++
.../table/operations/ddl/CreateTableOperation.java | 70 ++++
.../flink/table/catalog/CatalogCalciteSchema.java | 6 +-
.../table/catalog/CatalogManagerCalciteSchema.java | 2 +-
.../flink/table/catalog/DatabaseCalciteSchema.java | 10 +-
.../table/sqlexec/SqlConversionException.java | 35 ++
.../table/sqlexec/SqlExecutableStatements.java | 79 ----
.../table/sqlexec/SqlToOperationConverter.java | 193 +++++++++
.../flink/table/api/internal/TableEnvImpl.scala | 89 +++-
.../flink/table/calcite/FlinkPlannerImpl.scala | 42 +-
.../apache/flink/table/planner/StreamPlanner.scala | 45 +--
.../table/catalog/DatabaseCalciteSchemaTest.java | 2 +-
.../table/sqlexec/SqlToOperationConverterTest.java | 98 +++++
.../org.apache.flink.table.factories.TableFactory | 1 +
.../flink/table/catalog/CatalogTableITCase.scala | 450 +++++++++++++++++++++
.../utils/TestCollectionTableFactory.scala | 265 ++++++++++++
.../flink/table/utils/MockTableEnvironment.scala | 2 +
23 files changed, 1537 insertions(+), 149 deletions(-)
diff --git a/flink-python/pyflink/table/table_environment.py b/flink-python/pyflink/table/table_environment.py
index 3c8b44c..4b248f9 100644
--- a/flink-python/pyflink/table/table_environment.py
+++ b/flink-python/pyflink/table/table_environment.py
@@ -235,6 +235,69 @@ class TableEnvironment(object):
"""
return self._j_tenv.explain(table._j_table)
+ def sql(self, query):
+ """
+ Evaluates single sql statement including DDLs and DMLs.
+
+ Note: Always use this interface to execute a sql query. It only supports
+ to execute one sql statement a time.
+
+ A DDL statement can execute to create/drop a table/view:
+ For example, the below DDL statement would create a CSV table named `tbl1`
+ into the current catalog:
+
+ create table tbl1(
+ a int,
+ b bigint,
+ c varchar
+ ) with (
+ connector = 'csv',
+ csv.path = 'xxx'
+ )
+
+ The returns table format for different kind of statement:
+ DDL: returns null.
+ DML: a sql insert returns null; a sql query(select) returns a table
+ to describe the query data set, it can be further queried through the Table API,
+ or directly write to sink with `~Table.insert_into`.
+
+ SQL queries can directly execute as follows:
+ ::
+ >>> sinkDDL =
+ "create table sinkTable(
+ a int,
+ b varchar
+ ) with (
+ connector = 'csv',
+ csv.path = 'xxx'
+ )"
+
+ >>> sourceDDL =
+ "create table sourceTable(
+ a int,
+ b varchar
+ ) with (
+ connector = 'kafka',
+ kafka.topic = 'xxx',
+ kafka.endpoint = 'x.x.x'
+ )"
+
+ query = "INSERT INTO sinkTable SELECT FROM sourceTable"
+
+ tEnv.sql(sourceDDL)
+ tEnv.sql(sinkDDL)
+ tEnv.sql(query)
+ tEnv.execute("MyJob")
+
+ This code snippet creates a job to read data from Kafka source into a CSV sink.
+
+ :param query: The SQL statement to evaluate.
+ """
+ j_table = self._j_tenv.sql(query)
+ if j_table is None:
+ return None
+ return Table(j_table)
+
def sql_query(self, query):
"""
Evaluates a SQL query on registered tables and retrieves the result as a :class:`Table`.
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 84e4ca2..ec9e562 100644
--- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -123,6 +123,10 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode {
return comment;
}
+ public boolean isIfNotExists() {
+ return ifNotExists;
+ }
+
public void validate() throws SqlParseException {
Set<String> columnNames = new HashSet<>();
if (columnList != null) {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 10505ac..535a6bc 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -30,6 +30,8 @@ import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
+import javax.annotation.Nullable;
+
import java.util.Optional;
/**
@@ -321,6 +323,69 @@ public interface TableEnvironment {
String[] getCompletionHints(String statement, int position);
/**
+ * Evaluates single sql statement including DDLs and DMLs.
+ *
+ * <p>Note: Always use this interface to execute a sql query. It only supports
+ * to execute one sql statement a time.
+ *
+ * <p>A DDL statement can execute to create/drop a table/view:
+ * For example, the below DDL statement would create a CSV table named `tbl1`
+ * into the current catalog:
+ * <blockquote><pre>
+ * create table tbl1(
+ * a int,
+ * b bigint,
+ * c varchar
+ * ) with (
+ * connector = 'csv',
+ * csv.path = 'xxx'
+ * )
+ * </pre></blockquote>
+ *
+ * <p>The returns table format for different kind of statement:
+ * <ul>
+ * <li>DDL: returns null.</li>
+ * <li>DML: a sql insert returns null; a sql query(select) returns
+ * a table to describe the query data set, it can be further queried through
+ * the Table API, or directly write to sink with
+ * {@link #insertInto(Table, String, String...)}.</li>
+ * </ul>
+ *
+ * <p>SQL queries can directly execute as follows:
+ *
+ * <blockquote><pre>
+ * String sinkDDL = "create table sinkTable(
+ * a int,
+ * b varchar
+ * ) with (
+ * connector = 'csv',
+ * csv.path = 'xxx'
+ * )";
+ *
+ * String sourceDDL ="create table sourceTable(
+ * a int,
+ * b varchar
+ * ) with (
+ * connector = 'kafka',
+ * kafka.topic = 'xxx',
+ * kafka.endpoint = 'x.x.x'
+ * )";
+ *
+ * String query = "INSERT INTO sinkTable SELECT * FROM sourceTable";
+ *
+ * tEnv.sql(sourceDDL);
+ * tEnv.sql(sinkDDL);
+ * tEnv.sql(query);
+ * tEnv.execute("MyJob");
+ * </pre></blockquote>
+ * This code snippet creates a job to read data from Kafka source into a CSV sink.
+ *
+ * @param statement The SQL statement to evaluate.
+ */
+ @Nullable
+ Table sql(String statement);
+
+ /**
* Evaluates a SQL query on registered tables and retrieves the result as a {@link Table}.
*
* <p>All tables referenced by the query must be registered in the TableEnvironment.
@@ -337,9 +402,12 @@ public interface TableEnvironment {
* }
* </pre>
*
+ * @deprecated Use {@link #sql(String)}.
+ *
* @param query The SQL query to evaluate.
* @return The result of the query as Table
*/
+ @Deprecated
Table sqlQuery(String query);
/**
@@ -362,8 +430,11 @@ public interface TableEnvironment {
* }
* </pre>
*
+ * @deprecated Use {@link #sql(String)}.
+ *
* @param stmt The SQL statement to evaluate.
*/
+ @Deprecated
void sqlUpdate(String stmt);
/**
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 7d509b8..4817dcb 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -56,10 +56,12 @@ import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.TableSourceQueryOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceValidation;
+import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
import java.util.ArrayList;
@@ -189,7 +191,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
}
CatalogBaseTable tableTable = new QueryOperationCatalogView(table.getQueryOperation());
- registerTableInternal(name, tableTable);
+ registerTableInternal(new String[] { name }, tableTable, false);
}
@Override
@@ -291,6 +293,40 @@ public class TableEnvironmentImpl implements TableEnvironment {
}
@Override
+ public Table sql(String statement) {
+ List<Operation> operations = planner.parse(statement);
+ Preconditions.checkState(operations.size() == 1,
+ "sql() only accepts a single SQL statement a time.");
+ Operation operation = operations.get(0);
+ if (operation instanceof CreateTableOperation) {
+ CreateTableOperation createTableOperation = (CreateTableOperation) operation;
+ registerTableInternal(
+ createTableOperation.getTablePath(),
+ createTableOperation.getCatalogTable(),
+ createTableOperation.isIgnoreIfExists());
+ // returns null for DDL statement now.
+ return null;
+ } else if (operation instanceof ModifyOperation) {
+ List<ModifyOperation> modifyOperations =
+ Collections.singletonList((ModifyOperation) operation);
+ if (isEagerOperationTranslation()) {
+ translate(modifyOperations);
+ } else {
+ buffer(modifyOperations);
+ }
+ // returns null for SQL INSERT statement now.
+ return null;
+ } else if (operation instanceof QueryOperation){
+ return createTable((QueryOperation) operation);
+ } else {
+ throw new ValidationException(
+ "Unsupported SQL query! sqlQuery() only accepts a single SQL query of type " +
+ "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY; or DDL of type " +
+ "CREATE TABLE, CREATE VIEW; or DML of type INSERT INTO.");
+ }
+ }
+
+ @Override
public Table sqlQuery(String query) {
List<Operation> operations = planner.parse(query);
@@ -418,17 +454,31 @@ public class TableEnvironmentImpl implements TableEnvironment {
bufferedModifyOperations.addAll(modifyOperations);
}
- protected void registerTableInternal(String name, CatalogBaseTable table) {
+ /**
+ * Registers a {@link CatalogBaseTable} under a given object path. The {@code path} could be
+ * 3 formats:
+ * <ol>
+ * <li>`catalog.db.table`: A full table path including the catalog name,
+ * the database name and table name.</li>
+ * <li>`db.table`: database name following table name, with the current catalog name.</li>
+ * <li>`table`: Only the table name, with the current catalog name and database name.</li>
+ * </ol>
+ * The registered tables then can be referenced in Sql queries.
+ *
+ * @param path The path under which the table will be registered
+ * @param catalogTable The table to register
+ * @param ignoreIfExists If true, do nothing if there is already same table name under
+ * the {@code path}. If false, a TableAlreadyExistException throws.
+ */
+ private void registerTableInternal(String[] path,
+ CatalogBaseTable catalogTable,
+ boolean ignoreIfExists) {
+ String[] fullName = catalogManager.getFullTablePath(Arrays.asList(path));
+ Catalog catalog = getCatalog(fullName[0]).orElseThrow(() ->
+ new TableException("Catalog " + fullName[0] + " does not exist"));
+ ObjectPath objectPath = new ObjectPath(fullName[1], fullName[2]);
try {
- checkValidTableName(name);
- ObjectPath path = new ObjectPath(defaultDatabaseName, name);
- Optional<Catalog> catalog = catalogManager.getCatalog(defaultCatalogName);
- if (catalog.isPresent()) {
- catalog.get().createTable(
- path,
- table,
- false);
- }
+ catalog.createTable(objectPath, catalogTable, ignoreIfExists);
} catch (Exception e) {
throw new TableException("Could not register table", e);
}
@@ -477,7 +527,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
"Table '%s' already exists. Please choose a different name.", name));
}
} else {
- registerTableInternal(name, ConnectorCatalogTable.source(tableSource, false));
+ registerTableInternal(new String[] { name },
+ ConnectorCatalogTable.source(tableSource, false), false);
}
}
@@ -502,7 +553,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
"Table '%s' already exists. Please choose a different name.", name));
}
} else {
- registerTableInternal(name, ConnectorCatalogTable.sink(tableSink, false));
+ registerTableInternal(new String[] { name },
+ ConnectorCatalogTable.sink(tableSink, false), false);
}
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
index c965a98..0f83176 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.catalog;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.CatalogNotExistException;
import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.TableFactoryUtil;
@@ -387,4 +388,45 @@ public class CatalogManager {
return tableSink.getTableSchema();
}
}
+
+ /**
+ * Returns the full name of the given table path, this name may be padded
+ * with current catalog/database name based on the {@code paths} length.
+ *
+ * @param paths Table paths whose format can be "catalog.db.table", "db.table" or "table"
+ * @return An array of complete table path
+ */
+ public String[] getFullTablePath(List<String> paths) {
+ if (paths == null) {
+ throw new ValidationException("Table paths can not be null!");
+ }
+ if (paths.size() < 1 || paths.size() > 3) {
+ throw new ValidationException("Table paths length must be " +
+ "between 1(inclusive) and 3(inclusive)");
+ }
+ if (paths.stream().anyMatch(StringUtils::isNullOrWhitespaceOnly)) {
+ throw new ValidationException("Table paths contain null or " +
+ "while-space-only string");
+ }
+
+ if (paths.size() == 3) {
+ return new String[] {paths.get(0), paths.get(1), paths.get(2)};
+ }
+
+ String catalogName;
+ String dbName;
+ String tableName;
+
+ if (paths.size() == 1) {
+ catalogName = getCurrentCatalog();
+ dbName = getCurrentDatabase();
+ tableName = paths.get(0);
+ } else {
+ catalogName = getCurrentCatalog();
+ dbName = paths.get(0);
+ tableName = paths.get(1);
+ }
+
+ return new String[]{ catalogName, dbName, tableName };
+ }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index df15a95..5566fc3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -52,7 +52,10 @@ public class CatalogTableImpl extends AbstractCatalogTable {
@Override
public CatalogBaseTable copy() {
return new CatalogTableImpl(
- getSchema().copy(), new ArrayList<>(getPartitionKeys()), new HashMap<>(getProperties()), getComment());
+ getSchema().copy(),
+ new ArrayList<>(getPartitionKeys()),
+ new HashMap<>(getProperties()),
+ getComment());
}
@Override
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateOperation.java
new file mode 100644
index 0000000..fa8d650
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateOperation.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.operations.Operation;
+
+/**
+ * A {@link Operation} that describes the DDL statements, e.g. CREATE TABLE or CREATE FUNCTION.
+ *
+ * <p>Different sub operations can have their special instances. For example, a
+ * create table operation will have a {@link org.apache.flink.table.catalog.CatalogTable} instance,
+ * while a create function operation will have a
+ * {@link org.apache.flink.table.catalog.CatalogFunction} instance.
+ */
+@Internal
+public interface CreateOperation extends Operation {
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java
new file mode 100644
index 0000000..3c806f6
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateTableOperation.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Operation to describe a CREATE TABLE statement.
+ */
+public class CreateTableOperation implements CreateOperation {
+ private final String[] tablePath;
+ private CatalogTable catalogTable;
+ private boolean ignoreIfExists;
+
+ public CreateTableOperation(String[] tablePath,
+ CatalogTable catalogTable,
+ boolean ignoreIfExists) {
+ this.tablePath = tablePath;
+ this.catalogTable = catalogTable;
+ this.ignoreIfExists = ignoreIfExists;
+ }
+
+ public CatalogTable getCatalogTable() {
+ return catalogTable;
+ }
+
+ public String[] getTablePath() {
+ return tablePath;
+ }
+
+ public boolean isIgnoreIfExists() {
+ return ignoreIfExists;
+ }
+
+ @Override
+ public String asSummaryString() {
+ Map<String, Object> params = new LinkedHashMap<>();
+ params.put("catalogTable", catalogTable.toProperties());
+ params.put("tablePath", tablePath);
+ params.put("ignoreIfExists", ignoreIfExists);
+
+ return OperationUtils.formatWithChildren(
+ "CREATE TABLE",
+ params,
+ Collections.emptyList(),
+ Operation::asSummaryString);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
index 0790b80..5025adf 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogCalciteSchema.java
@@ -41,10 +41,12 @@ import java.util.Set;
@Internal
public class CatalogCalciteSchema implements Schema {
+ private final boolean isBatch;
private final String catalogName;
private final Catalog catalog;
- public CatalogCalciteSchema(String catalogName, Catalog catalog) {
+ public CatalogCalciteSchema(boolean isBatch, String catalogName, Catalog catalog) {
+ this.isBatch = isBatch;
this.catalogName = catalogName;
this.catalog = catalog;
}
@@ -59,7 +61,7 @@ public class CatalogCalciteSchema implements Schema {
public Schema getSubSchema(String schemaName) {
if (catalog.databaseExists(schemaName)) {
- return new DatabaseCalciteSchema(schemaName, catalogName, catalog);
+ return new DatabaseCalciteSchema(isBatch, schemaName, catalogName, catalog);
} else {
return null;
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
index cccd275..ceef249 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/CatalogManagerCalciteSchema.java
@@ -93,7 +93,7 @@ public class CatalogManagerCalciteSchema implements Schema {
return externalSchema.orElseGet(() ->
catalogManager.getCatalog(name)
- .map(catalog -> new CatalogCalciteSchema(name, catalog))
+ .map(catalog -> new CatalogCalciteSchema(isBatch, name, catalog))
.orElse(null)
);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
index 95475ef..ac7cdb8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/catalog/DatabaseCalciteSchema.java
@@ -51,11 +51,17 @@ import static java.lang.String.format;
* Tables are registered as tables in the schema.
*/
class DatabaseCalciteSchema implements Schema {
+ private final boolean isBatch;
private final String databaseName;
private final String catalogName;
private final Catalog catalog;
- public DatabaseCalciteSchema(String databaseName, String catalogName, Catalog catalog) {
+ public DatabaseCalciteSchema(
+ boolean isBatch,
+ String databaseName,
+ String catalogName,
+ Catalog catalog) {
+ this.isBatch = isBatch;
this.databaseName = databaseName;
this.catalogName = catalogName;
this.catalog = catalog;
@@ -126,7 +132,7 @@ class DatabaseCalciteSchema implements Schema {
// this means the TableSource extends from StreamTableSource, this is needed for the
// legacy Planner. Blink Planner should use the information that comes from the TableSource
// itself to determine if it is a streaming or batch source.
- true,
+ !isBatch,
FlinkStatistic.UNKNOWN()
);
}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlConversionException.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlConversionException.java
new file mode 100644
index 0000000..c49a31d
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlConversionException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+/**
+ * Exception thrown during the execution of SQL statements.
+ */
+public class SqlConversionException extends RuntimeException {
+
+ private static final long serialVersionUID = 1L;
+
+ public SqlConversionException(String message) {
+ super(message);
+ }
+
+ public SqlConversionException(String message, Throwable e) {
+ super(message, e);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatements.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatements.java
deleted file mode 100644
index c3c6816..0000000
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlExecutableStatements.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.sqlexec;
-
-import org.apache.flink.sql.parser.ddl.SqlCreateTable;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
-
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.util.ReflectUtil;
-import org.apache.calcite.util.ReflectiveVisitor;
-
-/**
- * Mix-in tool class for {@code SqlNode} that allows DDL commands to be
- * executed directly.
- *
- * <p>For every kind of {@link SqlNode}, there needs a method named
- * #execute(type), the 'type' argument should be the subclass
- * type for the supported {@link SqlNode}.
- */
-public class SqlExecutableStatements implements ReflectiveVisitor {
- private TableEnvironment tableEnv;
-
- private final ReflectUtil.MethodDispatcher<Void> dispatcher =
- ReflectUtil.createMethodDispatcher(Void.class,
- this,
- "execute",
- SqlNode.class);
-
- //~ Constructors -----------------------------------------------------------
-
- private SqlExecutableStatements(TableEnvironment tableEnvironment) {
- this.tableEnv = tableEnvironment;
- }
-
- /**
- * This is the main entrance of executing all kinds of DDL/DML {@code SqlNode}s, different
- * SqlNode will have it's implementation in the #execute(type) method whose 'type' argument
- * is subclass of {@code SqlNode}.
- *
- * <p>Caution that the {@link #execute(SqlNode)} should never expect to be invoked.
- *
- * @param tableEnvironment TableEnvironment to interact with
- * @param sqlNode SqlNode to execute on
- */
- public static void executeSqlNode(TableEnvironment tableEnvironment, SqlNode sqlNode) {
- SqlExecutableStatements statement = new SqlExecutableStatements(tableEnvironment);
- statement.dispatcher.invoke(sqlNode);
- }
-
- /**
- * Execute the {@link SqlCreateTable} node.
- */
- public void execute(SqlCreateTable sqlCreateTable) {
- // need to implement.
- }
-
- /** Fallback method to throw exception. */
- public void execute(SqlNode node) {
- throw new TableException("Should not invoke to node type "
- + node.getClass().getSimpleName());
- }
-}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
new file mode 100644
index 0000000..88a9d94
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+import org.apache.flink.sql.parser.SqlProperty;
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlTableColumn;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.calcite.FlinkTypeFactory;
+import org.apache.flink.table.calcite.FlinkTypeSystem;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.PlannerQueryOperation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.types.utils.TypeConversions;
+
+import org.apache.calcite.rel.RelRoot;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Mix-in tool class for {@code SqlNode} that allows DDL commands to be
+ * converted to {@link Operation}.
+ *
+ * <p>For every kind of {@link SqlNode}, there needs to have a corresponding
+ * #convert(type) method, the 'type' argument should be the subclass
+ * of the supported {@link SqlNode}.
+ *
+ * <p>Every #convert() should return a {@link Operation} which can be used in
+ * {@link org.apache.flink.table.delegation.Planner}.
+ */
+public class SqlToOperationConverter {
+ private FlinkPlannerImpl flinkPlanner;
+
+ //~ Constructors -----------------------------------------------------------
+
+ private SqlToOperationConverter(FlinkPlannerImpl flinkPlanner) {
+ this.flinkPlanner = flinkPlanner;
+ }
+
+ /**
+ * This is the main entrance for executing all kinds of DDL/DML {@code SqlNode}s, different
+ * SqlNode will have it's implementation in the #execute(type) method whose 'type' argument
+ * is subclass of {@code SqlNode}.
+ *
+ * @param flinkPlanner FlinkPlannerImpl to convert sql node to rel node
+ * @param sqlNode SqlNode to execute on
+ */
+ public static Operation convert(FlinkPlannerImpl flinkPlanner, SqlNode sqlNode) {
+ // validate the query
+ final SqlNode validated = flinkPlanner.validate(sqlNode);
+ SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner);
+ if (validated instanceof SqlCreateTable) {
+ return converter.convert((SqlCreateTable) validated);
+ } else {
+ return converter.convert(validated);
+ }
+ }
+
+ /**
+ * Convert the {@link SqlCreateTable} node.
+ */
+ private Operation convert(SqlCreateTable sqlCreateTable) {
+ // primary key and unique keys are not supported
+ if ((sqlCreateTable.getPrimaryKeyList() != null
+ && sqlCreateTable.getPrimaryKeyList().size() > 0)
+ || (sqlCreateTable.getUniqueKeysList() != null
+ && sqlCreateTable.getUniqueKeysList().size() > 0)) {
+ throw new SqlConversionException("Primary key and unique key are not supported yet.");
+ }
+
+ // set with properties
+ SqlNodeList propertyList = sqlCreateTable.getPropertyList();
+ Map<String, String> properties = new HashMap<>();
+ if (propertyList != null) {
+ propertyList.getList().forEach(p ->
+ properties.put(((SqlProperty) p).getKeyString().toLowerCase(),
+ ((SqlProperty) p).getValueString()));
+ }
+
+ TableSchema tableSchema = createTableSchema(sqlCreateTable,
+ new FlinkTypeFactory(new FlinkTypeSystem())); // need to make type factory singleton ?
+ String tableComment = "";
+ if (sqlCreateTable.getComment() != null) {
+ tableComment = sqlCreateTable.getComment().getNlsString().getValue();
+ }
+ // set partition key
+ List<String> partitionKeys = new ArrayList<>();
+ SqlNodeList partitionKey = sqlCreateTable.getPartitionKeyList();
+ if (partitionKey != null) {
+ partitionKeys = partitionKey
+ .getList()
+ .stream()
+ .map(p -> ((SqlIdentifier) p).getSimple())
+ .collect(Collectors.toList());
+ }
+ CatalogTable catalogTable = new CatalogTableImpl(tableSchema,
+ partitionKeys,
+ properties,
+ tableComment);
+ return new CreateTableOperation(sqlCreateTable.fullTableName(), catalogTable,
+ sqlCreateTable.isIfNotExists());
+ }
+
+ /** Fallback method for sql query. */
+ private Operation convert(SqlNode node) {
+ if (node.getKind().belongsTo(SqlKind.QUERY)) {
+ return toQueryOperation(flinkPlanner, node);
+ } else {
+ throw new TableException("Should not invoke to node type "
+ + node.getClass().getSimpleName());
+ }
+ }
+
+ //~ Tools ------------------------------------------------------------------
+
+ /**
+ * Create a table schema from {@link SqlCreateTable}. This schema contains computed column
+ * fields, say, we have a create table DDL statement:
+ * <blockquote><pre>
+ * create table t(
+ * a int,
+ * b varchar,
+ * c as to_timestamp(b))
+ * with (
+ * connector = 'csv',
+ * k1 = 'v1')
+ * </pre></blockquote>
+ *
+ * <p>The returned table schema contains columns (a:int, b:varchar, c:timestamp).
+ *
+ * @param sqlCreateTable sql create table node.
+ * @param factory FlinkTypeFactory instance.
+ * @return TableSchema
+ */
+ private TableSchema createTableSchema(SqlCreateTable sqlCreateTable,
+ FlinkTypeFactory factory) {
+ // setup table columns
+ SqlNodeList columnList = sqlCreateTable.getColumnList();
+ TableSchema physicalSchema = null;
+ TableSchema.Builder builder = new TableSchema.Builder();
+ // collect the physical table schema first.
+ final List<SqlNode> physicalColumns = columnList.getList().stream()
+ .filter(n -> n instanceof SqlTableColumn).collect(Collectors.toList());
+ for (SqlNode node : physicalColumns) {
+ SqlTableColumn column = (SqlTableColumn) node;
+ final RelDataType relType = column.getType().deriveType(factory,
+ column.getType().getNullable());
+ builder.field(column.getName().getSimple(),
+ TypeConversions.fromLegacyInfoToDataType(FlinkTypeFactory.toTypeInfo(relType)));
+ physicalSchema = builder.build();
+ }
+ assert physicalSchema != null;
+ if (sqlCreateTable.containsComputedColumn()) {
+ throw new SqlConversionException("Computed columns for DDL is not supported yet!");
+ }
+ return physicalSchema;
+ }
+
+ private PlannerQueryOperation toQueryOperation(FlinkPlannerImpl planner, SqlNode validated) {
+ // transform to a relational tree
+ RelRoot relational = planner.rel(validated);
+ return new PlannerQueryOperation(relational.rel);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 24e52d7..36e30cb 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.api.internal
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.sql.parser.ddl.SqlCreateTable
import org.apache.flink.table.api._
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder}
import org.apache.flink.table.catalog._
@@ -27,11 +28,13 @@ import org.apache.flink.table.expressions._
import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction, UserDefinedAggregateFunction, _}
+import org.apache.flink.table.operations.ddl.CreateTableOperation
import org.apache.flink.table.operations.utils.OperationTreeBuilder
import org.apache.flink.table.operations.{CatalogQueryOperation, PlannerQueryOperation, TableSourceQueryOperation, _}
import org.apache.flink.table.planner.PlanningConfigurationBuilder
import org.apache.flink.table.sinks.{TableSink, TableSinkUtils}
import org.apache.flink.table.sources.TableSource
+import org.apache.flink.table.sqlexec.SqlToOperationConverter
import org.apache.flink.table.util.JavaScalaConversionUtil
import org.apache.flink.util.StringUtils
@@ -43,6 +46,7 @@ import org.apache.calcite.tools.FrameworkConfig
import _root_.java.util.Optional
import _root_.scala.collection.JavaConverters._
+import _root_.scala.collection.JavaConversions._
/**
* The abstract base class for the implementation of batch TableEnvironment.
@@ -188,7 +192,7 @@ abstract class TableEnvImpl(
}
val tableTable = new QueryOperationCatalogView(table.getQueryOperation)
- registerTableInternal(name, tableTable)
+ registerTableInternal(Array[String](name), tableTable, ignoreIfExists = false)
}
override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = {
@@ -274,7 +278,8 @@ abstract class TableEnvImpl(
// no table is registered
case _ =>
- registerTableInternal(name, ConnectorCatalogTable.source(tableSource, isBatch))
+ registerTableInternal(Array[String](name),
+ ConnectorCatalogTable.source(tableSource, isBatch), ignoreIfExists = false)
}
}
@@ -301,7 +306,8 @@ abstract class TableEnvImpl(
// no table is registered
case _ =>
- registerTableInternal(name, ConnectorCatalogTable.sink(tableSink, isBatch))
+ registerTableInternal(Array[String](name),
+ ConnectorCatalogTable.sink(tableSink, isBatch), ignoreIfExists = false)
}
}
@@ -311,17 +317,33 @@ abstract class TableEnvImpl(
}
}
- protected def registerTableInternal(name: String, table: CatalogBaseTable): Unit = {
- checkValidTableName(name)
- val path = new ObjectPath(defaultDatabaseName, name)
- JavaScalaConversionUtil.toScala(catalogManager.getCatalog(defaultCatalogName)) match {
- case Some(catalog) =>
- catalog.createTable(
- path,
- table,
- false)
- case None => throw new TableException("The default catalog does not exist.")
- }
+ /**
+ * Registers a [[CatalogTable]] under a given object path. The `path` could be
+ * 3 formats:
+ * <ol>
+ * <li>`catalog.db.table`: A full table path including the catalog name,
+ * the database name and table name.</li>
+ * <li>`db.table`: database name following table name, with the current catalog name.</li>
+ * <li>`table`: Only the table name, with the current catalog name and database name.</li>
+ * </ol>
+ * The registered tables then can be referenced in Sql queries.
+ *
+ * @param path The path under which the table will be registered
+ * @param catalogTable The table to register
+ * @param ignoreIfExists If true, do nothing if there is already same table name under
+ * the { @code path}. If false, a TableAlreadyExistException throws.
+ */
+ private def registerTableInternal(
+ path: Array[String],
+ catalogTable: CatalogBaseTable,
+ ignoreIfExists: Boolean): Unit = {
+ val fullName = catalogManager.getFullTablePath(path.toList)
+ val catalog = getCatalog(fullName(0)).orElseThrow(
+ new _root_.java.util.function.Supplier[TableException] {
+ override def get = new TableException(s"Catalog ${fullName(0)} does not exist")
+ })
+ val objectPath = new ObjectPath(fullName(1), fullName(2))
+ catalog.createTable(objectPath, catalogTable, ignoreIfExists)
}
protected def replaceTableInternal(name: String, table: CatalogBaseTable): Unit = {
@@ -380,6 +402,45 @@ abstract class TableEnvImpl(
planner.getCompletionHints(statement, position)
}
+ override def sql(statement: String): Table = {
+ val planner = getFlinkPlanner
+ // parse the sql query
+ val parsed = planner.parse(statement)
+ if (null != parsed) {
+ parsed match {
+ case insert: SqlInsert =>
+ val query = insert.getSource
+ val tableOperation = SqlToOperationConverter
+ .convert(planner, query)
+ .asInstanceOf[QueryOperation]
+ // get query result as Table
+ val queryResult = createTable(tableOperation)
+
+ // get name of sink table
+ val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
+
+ // insert query result into sink table
+ insertInto(queryResult, targetTablePath.asScala:_*)
+ // returns null for SQL INSERT statement now
+ null
+ case createTable: SqlCreateTable =>
+ val operation = SqlToOperationConverter
+ .convert(planner, createTable)
+ .asInstanceOf[CreateTableOperation]
+ registerTableInternal(operation.getTablePath,
+ operation.getCatalogTable,
+ operation.isIgnoreIfExists)
+ // returns null for DDL statement now
+ null
+ case query: SqlNode if query.getKind.belongsTo(SqlKind.QUERY) =>
+ createTable(SqlToOperationConverter.convert(planner, query)
+ .asInstanceOf[PlannerQueryOperation])
+ }
+ } else {
+ throw new TableException("Unsupported SQL query!")
+ }
+ }
+
override def sqlQuery(query: String): Table = {
val planner = getFlinkPlanner
// parse the sql query
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
index 2814497..e92bef4 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/calcite/FlinkPlannerImpl.scala
@@ -18,27 +18,31 @@
package org.apache.flink.table.calcite
-import java.util
+import org.apache.flink.sql.parser.ExtendedSqlNode
+import org.apache.flink.table.api.{SqlParserException, TableException, ValidationException}
+import org.apache.flink.table.catalog.CatalogReader
+
import com.google.common.collect.ImmutableList
import org.apache.calcite.plan.RelOptTable.ViewExpander
import org.apache.calcite.plan._
import org.apache.calcite.prepare.CalciteCatalogReader
import org.apache.calcite.rel.RelRoot
import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rel.core.RelFactories
import org.apache.calcite.rex.RexBuilder
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.sql.advise.{SqlAdvisor, SqlAdvisorValidator}
import org.apache.calcite.sql.parser.{SqlParser, SqlParseException => CSqlParseException}
import org.apache.calcite.sql.validate.SqlValidator
-import org.apache.calcite.sql.{SqlNode, SqlOperatorTable}
+import org.apache.calcite.sql.{SqlKind, SqlNode, SqlOperatorTable}
import org.apache.calcite.sql2rel.{RelDecorrelator, SqlRexConvertletTable, SqlToRelConverter}
-import org.apache.calcite.tools.{FrameworkConfig, RelConversionException}
-import org.apache.flink.table.api.{SqlParserException, TableException, ValidationException}
-import org.apache.flink.table.catalog.CatalogReader
+import org.apache.calcite.tools.{FrameworkConfig, RelBuilder, RelConversionException}
+
+import _root_.java.lang.{Boolean => JBoolean}
+import _root_.java.util
+import _root_.java.util.function.{Function => JFunction}
import scala.collection.JavaConversions._
-import java.util.function.{Function => JFunction}
-import java.lang.{Boolean => JBoolean}
/**
* NOTE: this is heavily inspired by Calcite's PlannerImpl.
@@ -98,13 +102,24 @@ class FlinkPlannerImpl(
def validate(sqlNode: SqlNode): SqlNode = {
val catalogReader = catalogReaderSupplier.apply(false)
+ // do pre-validate rewrite.
+ sqlNode.accept(new PreValidateReWriter(catalogReader, typeFactory))
+ // do extended validation.
+ sqlNode match {
+ case node: ExtendedSqlNode =>
+ node.validate()
+ case _ =>
+ }
+ // no need to validate row type for DDL nodes.
+ if (sqlNode.getKind.belongsTo(SqlKind.DDL)) {
+ return sqlNode
+ }
validator = new FlinkCalciteSqlValidator(
operatorTable,
catalogReader,
typeFactory)
validator.setIdentifierExpansion(true)
try {
- sqlNode.accept(new PreValidateReWriter(catalogReader, typeFactory))
validator.validate(sqlNode)
}
catch {
@@ -118,10 +133,11 @@ class FlinkPlannerImpl(
assert(validatedSqlNode != null)
val rexBuilder: RexBuilder = createRexBuilder
val cluster: RelOptCluster = FlinkRelOptClusterFactory.create(planner, rexBuilder)
+ val catalogReader: CatalogReader = catalogReaderSupplier.apply(false)
val sqlToRelConverter: SqlToRelConverter = new SqlToRelConverter(
new ViewExpanderImpl,
validator,
- catalogReaderSupplier.apply(false),
+ catalogReader,
cluster,
convertletTable,
sqlToRelConverterConfig)
@@ -176,7 +192,8 @@ class FlinkPlannerImpl(
sqlToRelConverterConfig)
root = sqlToRelConverter.convertQuery(validatedSqlNode, true, false)
root = root.withRel(sqlToRelConverter.flattenTypes(root.rel, true))
- root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel))
+ val relBuilder = createRelBuilder(root.rel.getCluster, catalogReader)
+ root = root.withRel(RelDecorrelator.decorrelateQuery(root.rel, relBuilder))
FlinkPlannerImpl.this.root
}
}
@@ -185,6 +202,11 @@ class FlinkPlannerImpl(
new RexBuilder(typeFactory)
}
+ private def createRelBuilder(
+ relOptCluster: RelOptCluster,
+ relOptSchema: RelOptSchema): RelBuilder = {
+ RelFactories.LOGICAL_BUILDER.create(relOptCluster, relOptSchema)
+ }
}
object FlinkPlannerImpl {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
index fd5a5c7..502ea36 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/StreamPlanner.scala
@@ -23,16 +23,6 @@ import org.apache.flink.api.dag.Transformation
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSink}
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-
-import org.apache.calcite.jdbc.CalciteSchema
-import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
-import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind, SqlNode}
-
-import _root_.java.lang.{Boolean => JBool}
-import _root_.java.util.{Objects, List => JList}
-import java.util
import org.apache.flink.table.api._
import org.apache.flink.table.calcite.{CalciteConfig, FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory}
import org.apache.flink.table.catalog.{CatalogManager, CatalogManagerCalciteSchema, CatalogTable, ConnectorCatalogTable, _}
@@ -48,9 +38,20 @@ import org.apache.flink.table.plan.nodes.datastream.DataStreamRel
import org.apache.flink.table.plan.util.UpdatingPlanChecker
import org.apache.flink.table.runtime.types.CRow
import org.apache.flink.table.sinks._
+import org.apache.flink.table.sqlexec.SqlToOperationConverter
import org.apache.flink.table.types.utils.TypeConversions
import org.apache.flink.table.util.JavaScalaConversionUtil
+import org.apache.calcite.jdbc.CalciteSchema
+import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.sql.{SqlIdentifier, SqlInsert, SqlKind}
+
+import _root_.java.lang.{Boolean => JBool}
+import _root_.java.util
+import _root_.java.util.{Objects, List => JList}
+
import _root_.scala.collection.JavaConverters._
/**
@@ -104,18 +105,21 @@ class StreamPlanner(
if (targetColumnList != null && insert.getTargetColumnList.size() != 0) {
throw new ValidationException("Partial inserts are not supported")
}
-
// get name of sink table
val targetTablePath = insert.getTargetTable.asInstanceOf[SqlIdentifier].names
- List(new CatalogSinkModifyOperation(targetTablePath, toRel(planner, insert.getSource))
+ List(new CatalogSinkModifyOperation(targetTablePath,
+ SqlToOperationConverter.convert(planner,
+ insert.getSource).asInstanceOf[PlannerQueryOperation])
.asInstanceOf[Operation]).asJava
- case node if node.getKind.belongsTo(SqlKind.QUERY) =>
- List(toRel(planner, parsed).asInstanceOf[Operation]).asJava
+ case node if node.getKind.belongsTo(SqlKind.QUERY) || node.getKind.belongsTo(SqlKind.DDL) =>
+ List(SqlToOperationConverter.convert(planner, parsed)).asJava
case _ =>
throw new TableException(
"Unsupported SQL query! parse() only accepts SQL queries of type " +
- "SELECT, UNION, INTERSECT, EXCEPT, VALUES, ORDER_BY or INSERT.")
+ "SELECT, UNION, INTERSECT, EXCEPT, VALUES, ORDER_BY or INSERT;" +
+ "and SQL DDLs of type " +
+ "CREATE TABLE")
}
}
@@ -141,17 +145,6 @@ class StreamPlanner(
planner.getCompletionHints(statement, position)
}
- private def toRel(
- planner: FlinkPlannerImpl,
- parsed: SqlNode)
- : PlannerQueryOperation = {
- // validate the sql query
- val validated = planner.validate(parsed)
- // transform to a relational tree
- val relational = planner.rel(validated)
- new PlannerQueryOperation(relational.rel)
- }
-
private def translate(tableOperation: ModifyOperation)
: Transformation[_] = {
tableOperation match {
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
index 20d78e3..312b5de 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/catalog/DatabaseCalciteSchemaTest.java
@@ -48,7 +48,7 @@ public class DatabaseCalciteSchemaTest {
@Test
public void testCatalogTable() throws TableAlreadyExistException, DatabaseNotExistException {
GenericInMemoryCatalog catalog = new GenericInMemoryCatalog(catalogName, databaseName);
- DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(
+ DatabaseCalciteSchema calciteSchema = new DatabaseCalciteSchema(false,
databaseName,
catalogName,
catalog);
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
new file mode 100644
index 0000000..cc78be7
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/sqlexec/SqlToOperationConverterTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sqlexec;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.internal.BatchTableEnvImpl;
+import org.apache.flink.table.api.java.BatchTableEnvironment;
+import org.apache.flink.table.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.types.DataType;
+
+import org.apache.calcite.sql.SqlNode;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/** Test cases for SqlExecutableStatement. **/
+public class SqlToOperationConverterTest {
+ private static final ExecutionEnvironment streamExec =
+ ExecutionEnvironment.getExecutionEnvironment();
+ private static final BatchTableEnvImpl batchEnv =
+ (BatchTableEnvImpl) BatchTableEnvironment.create(streamExec);
+
+ private static final FlinkPlannerImpl planner = batchEnv.getFlinkPlanner();
+
+ @Test
+ public void testCreateTable() {
+ final String sql = "CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " b varchar, \n" +
+ " c int, \n" +
+ " d varchar" +
+ ")\n" +
+ " PARTITIONED BY (a, d)\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n";
+ SqlNode node = planner.parse(sql);
+ assert node instanceof SqlCreateTable;
+ Operation operation = SqlToOperationConverter.convert(planner, node);
+ assert operation instanceof CreateTableOperation;
+ CreateTableOperation op = (CreateTableOperation) operation;
+ CatalogTable catalogTable = op.getCatalogTable();
+ assertEquals(Arrays.asList("a", "d"), catalogTable.getPartitionKeys());
+ assertArrayEquals(catalogTable.getSchema().getFieldNames(),
+ new String[] {"a", "b", "c", "d"});
+ assertArrayEquals(catalogTable.getSchema().getFieldDataTypes(),
+ new DataType[]{
+ DataTypes.BIGINT(),
+ DataTypes.VARCHAR(Integer.MAX_VALUE),
+ DataTypes.INT(),
+ DataTypes.VARCHAR(Integer.MAX_VALUE)});
+ }
+
+ @Test(expected = SqlConversionException.class)
+ public void testCreateTableWithPkUniqueKeys() {
+ final String sql = "CREATE TABLE tbl1 (\n" +
+ " a bigint,\n" +
+ " b varchar, \n" +
+ " c int, \n" +
+ " d varchar, \n" +
+ " primary key(a), \n" +
+ " unique(a, b) \n" +
+ ")\n" +
+ " PARTITIONED BY (a, d)\n" +
+ " with (\n" +
+ " connector = 'kafka', \n" +
+ " kafka.topic = 'log.test'\n" +
+ ")\n";
+ SqlNode node = planner.parse(sql);
+ assert node instanceof SqlCreateTable;
+ SqlToOperationConverter.convert(planner, node);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index c5fe13f..e28eb5d 100644
--- a/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-table-planner/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -22,3 +22,4 @@ org.apache.flink.table.factories.utils.TestAmbiguousTableFormatFactory
org.apache.flink.table.factories.utils.TestExternalCatalogFactory
org.apache.flink.table.catalog.TestExternalTableSourceFactory
org.apache.flink.table.factories.utils.TestCatalogFactory
+org.apache.flink.table.factories.utils.TestCollectionTableFactory
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
new file mode 100644
index 0000000..fc97071
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.catalog
+
+import org.apache.flink.api.scala.ExecutionEnvironment
+import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
+import org.apache.flink.table.factories.utils.TestCollectionTableFactory
+import org.apache.flink.types.Row
+
+import org.junit.Assert.assertEquals
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, Ignore, Test}
+
+import java.util
+
+import scala.collection.JavaConversions._
+
+/** Test cases for catalog table. */
+@RunWith(classOf[Parameterized])
+class CatalogTableITCase(isStreaming: Boolean) {
+
+ private val batchExec: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+ private var batchEnv: BatchTableEnvironment = _
+ private val streamExec: StreamExecutionEnvironment = StreamExecutionEnvironment
+ .getExecutionEnvironment
+ private var streamEnv: StreamTableEnvironment = _
+
+ private val SOURCE_DATA = List(
+ toRow(1, "a"),
+ toRow(2, "b"),
+ toRow(3, "c")
+ )
+
+ private val DIM_DATA = List(
+ toRow(1, "aDim"),
+ toRow(2, "bDim"),
+ toRow(3, "cDim")
+ )
+
+ implicit def rowOrdering: Ordering[Row] = Ordering.by((r : Row) => {
+ val builder = new StringBuilder
+ 0 until r.getArity foreach(idx => builder.append(r.getField(idx)))
+ builder.toString()
+ })
+
+ @Before
+ def before(): Unit = {
+ batchExec.setParallelism(4)
+ streamExec.setParallelism(4)
+ batchEnv = BatchTableEnvironment.create(batchExec)
+ streamEnv = StreamTableEnvironment.create(streamExec)
+ TestCollectionTableFactory.reset()
+ TestCollectionTableFactory.isStreaming = isStreaming
+ }
+
+ def toRow(args: Any*):Row = {
+ val row = new Row(args.length)
+ 0 until args.length foreach {
+ i => row.setField(i, args(i))
+ }
+ row
+ }
+
+ def tableEnv: TableEnvironment = {
+ if (isStreaming) {
+ streamEnv
+ } else {
+ batchEnv
+ }
+ }
+
+ def execJob(name: String) = {
+ if (isStreaming) {
+ streamExec.execute(name)
+ } else {
+ batchExec.execute(name)
+ }
+ }
+
+ @Test
+ def testInsertInto(): Unit = {
+ val sourceData = List(
+ toRow(1, "1000", 2),
+ toRow(2, "1", 3),
+ toRow(3, "2000", 4),
+ toRow(1, "2", 2),
+ toRow(2, "3000", 3)
+ )
+ TestCollectionTableFactory.initData(sourceData)
+ val sourceDDL =
+ """
+ |create table t1(
+ | a int,
+ | b varchar,
+ | c int
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val sinkDDL =
+ """
+ |create table t2(
+ | a int,
+ | b varchar,
+ | c int
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val query =
+ """
+ |insert into t2
+ |select t1.a, t1.b, (t1.a + 1) as c from t1
+ """.stripMargin
+ tableEnv.sql(sourceDDL)
+ tableEnv.sql(sinkDDL)
+ tableEnv.sql(query)
+ execJob("testJob")
+ assertEquals(sourceData.sorted, TestCollectionTableFactory.RESULT.sorted)
+ }
+
+ @Ignore // need to implement
+ @Test
+ def testInsertTargetTableWithComputedColumn(): Unit = {
+ TestCollectionTableFactory.initData(SOURCE_DATA)
+ val sourceDDL =
+ """
+ |create table t1(
+ | a int,
+ | b varchar,
+ | c int
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val sinkDDL =
+ """
+ |create table t2(
+ | a int,
+ | b varchar,
+ | c as a + 1
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val query =
+ """
+ |insert into t2(a, b)
+ |select t1.a, t1.b from t1
+ """.stripMargin
+ tableEnv.sql(sourceDDL)
+ tableEnv.sql(sinkDDL)
+ tableEnv.sql(query)
+ execJob("testJob")
+ assertEquals(SOURCE_DATA.sorted, TestCollectionTableFactory.RESULT.sorted)
+ }
+
+ @Test
+ def testInsertWithJoinedSource(): Unit = {
+ val sourceData = List(
+ toRow(1, 1000, 2),
+ toRow(2, 1, 3),
+ toRow(3, 2000, 4),
+ toRow(1, 2, 2),
+ toRow(2, 3000, 3)
+ )
+
+ val expected = List(
+ toRow(1, 1000, 2, 1),
+ toRow(1, 2, 2, 1),
+ toRow(2, 1, 1, 2),
+ toRow(2, 3000, 1, 2)
+ )
+ TestCollectionTableFactory.initData(sourceData)
+ val sourceDDL =
+ """
+ |create table t1(
+ | a int,
+ | b int,
+ | c int
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val sinkDDL =
+ """
+ |create table t2(
+ | a int,
+ | b int,
+ | c int,
+ | d int
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val query =
+ """
+ |insert into t2
+ |select a.a, a.b, b.a, b.b
+ | from t1 a
+ | join t1 b
+ | on a.a = b.b
+ """.stripMargin
+ tableEnv.sql(sourceDDL)
+ tableEnv.sql(sinkDDL)
+ tableEnv.sql(query)
+ execJob("testJob")
+ assertEquals(expected.sorted, TestCollectionTableFactory.RESULT.sorted)
+ }
+
+ @Test
+ def testInsertWithAggregateSource(): Unit = {
+ if (isStreaming) {
+ return
+ }
+ val sourceData = List(
+ toRow(1, 1000, 2),
+ toRow(2, 1000, 3),
+ toRow(3, 2000, 4),
+ toRow(4, 2000, 5),
+ toRow(5, 3000, 6)
+ )
+
+ val expected = List(
+ toRow(3, 1000),
+ toRow(5, 3000),
+ toRow(7, 2000)
+ )
+ TestCollectionTableFactory.initData(sourceData)
+ val sourceDDL =
+ """
+ |create table t1(
+ | a int,
+ | b int,
+ | c int
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val sinkDDL =
+ """
+ |create table t2(
+ | a int,
+ | b int
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val query =
+ """
+ |insert into t2
+ |select sum(a), t1.b from t1 group by t1.b
+ """.stripMargin
+ tableEnv.sql(sourceDDL)
+ tableEnv.sql(sinkDDL)
+ tableEnv.sql(query)
+ execJob("testJob")
+ assertEquals(expected.sorted, TestCollectionTableFactory.RESULT.sorted)
+ }
+
+ @Test @Ignore // need to implement
+ def testStreamSourceTableWithProctime(): Unit = {
+ val sourceData = List(
+ toRow(1, 1000),
+ toRow(2, 2000),
+ toRow(3, 3000)
+ )
+ TestCollectionTableFactory.initData(sourceData, emitInterval = 1000L)
+ val sourceDDL =
+ """
+ |create table t1(
+ | a int,
+ | b int,
+ | c as proctime,
+ | primary key(a)
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val sinkDDL =
+ """
+ |create table t2(
+ | a int,
+ | b int
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val query =
+ """
+ |insert into t2
+ |select sum(a), sum(b) from t1 group by TUMBLE(c, INTERVAL '1' SECOND)
+ """.stripMargin
+
+ tableEnv.sql(sourceDDL)
+ tableEnv.sql(sinkDDL)
+ tableEnv.sql(query)
+ execJob("testJob")
+ assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
+ }
+
+ @Test @Ignore // need to implement
+ def testStreamSourceTableWithRowtime(): Unit = {
+ val sourceData = List(
+ toRow(1, 1000),
+ toRow(2, 2000),
+ toRow(3, 3000)
+ )
+ TestCollectionTableFactory.initData(sourceData, emitInterval = 1000L)
+ val sourceDDL =
+ """
+ |create table t1(
+ | a bigint,
+ | b bigint,
+ | primary key(a),
+ | WATERMARK wm FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val sinkDDL =
+ """
+ |create table t2(
+ | a bigint,
+ | b bigint
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val query =
+ """
+ |insert into t2
+ |select sum(a), sum(b) from t1 group by TUMBLE(wm, INTERVAL '1' SECOND)
+ """.stripMargin
+
+ tableEnv.sql(sourceDDL)
+ tableEnv.sql(sinkDDL)
+ tableEnv.sql(query)
+ execJob("testJob")
+ assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
+ }
+
+ @Test @Ignore // need to implement
+ def testBatchSourceTableWithProctime(): Unit = {
+ val sourceData = List(
+ toRow(1, 1000),
+ toRow(2, 2000),
+ toRow(3, 3000)
+ )
+ TestCollectionTableFactory.initData(sourceData, emitInterval = 1000L)
+ val sourceDDL =
+ """
+ |create table t1(
+ | a int,
+ | b int,
+ | c as proctime,
+ | primary key(a)
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val sinkDDL =
+ """
+ |create table t2(
+ | a int,
+ | b int
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val query =
+ """
+ |insert into t2
+ |select sum(a), sum(b) from t1 group by TUMBLE(c, INTERVAL '1' SECOND)
+ """.stripMargin
+
+ tableEnv.sql(sourceDDL)
+ tableEnv.sql(sinkDDL)
+ tableEnv.sql(query)
+ execJob("testJob")
+ assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
+ }
+
+ @Test @Ignore // need to implement
+ def testBatchTableWithRowtime(): Unit = {
+ val sourceData = List(
+ toRow(1, 1000),
+ toRow(2, 2000),
+ toRow(3, 3000)
+ )
+ TestCollectionTableFactory.initData(sourceData, emitInterval = 1000L)
+ val sourceDDL =
+ """
+ |create table t1(
+ | a bigint,
+ | b bigint,
+ | primary key(a),
+ | WATERMARK wm FOR a AS BOUNDED WITH DELAY 1000 MILLISECOND
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val sinkDDL =
+ """
+ |create table t2(
+ | a bigint,
+ | b bigint
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val query =
+ """
+ |insert into t2
+ |select sum(a), sum(b) from t1 group by TUMBLE(wm, INTERVAL '1' SECOND)
+ """.stripMargin
+
+ tableEnv.sql(sourceDDL)
+ tableEnv.sql(sinkDDL)
+ tableEnv.sql(query)
+ execJob("testJob")
+ assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
+ }
+}
+
+object CatalogTableITCase {
+ @Parameterized.Parameters(name = "{0}")
+ def parameters(): java.util.Collection[Boolean] = {
+ util.Arrays.asList(true, false)
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala
new file mode 100644
index 0000000..3b17986
--- /dev/null
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/utils/TestCollectionTableFactory.scala
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeutils.TypeSerializer
+import org.apache.flink.api.java.io.{CollectionInputFormat, LocalCollectionOutputFormat}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.datastream.{DataStream, DataStreamSource}
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
+import org.apache.flink.table.api.TableSchema
+import org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR
+import org.apache.flink.table.descriptors.{DescriptorProperties, Schema}
+import org.apache.flink.table.factories.utils.TestCollectionTableFactory.{getCollectionSink, getCollectionSource}
+import org.apache.flink.table.factories.{BatchTableSinkFactory, BatchTableSourceFactory, StreamTableSinkFactory, StreamTableSourceFactory}
+import org.apache.flink.table.functions.{AsyncTableFunction, TableFunction}
+import org.apache.flink.table.sinks.{AppendStreamTableSink, BatchTableSink, StreamTableSink, TableSink}
+import org.apache.flink.table.sources.{BatchTableSource, LookupableTableSource, StreamTableSource, TableSource}
+import org.apache.flink.types.Row
+
+import java.io.IOException
+import java.util
+import java.util.{ArrayList => JArrayList, LinkedList => JLinkedList, List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+class TestCollectionTableFactory
+ extends StreamTableSourceFactory[Row]
+ with StreamTableSinkFactory[Row]
+ with BatchTableSourceFactory[Row]
+ with BatchTableSinkFactory[Row]
+{
+
+ override def createTableSource(properties: JMap[String, String]): TableSource[Row] = {
+ getCollectionSource(properties, isStreaming = TestCollectionTableFactory.isStreaming)
+ }
+
+ override def createTableSink(properties: JMap[String, String]): TableSink[Row] = {
+ getCollectionSink(properties)
+ }
+
+ override def createStreamTableSource(properties: JMap[String, String]): StreamTableSource[Row] = {
+ getCollectionSource(properties, isStreaming = true)
+ }
+
+ override def createStreamTableSink(properties: JMap[String, String]): StreamTableSink[Row] = {
+ getCollectionSink(properties)
+ }
+
+ override def createBatchTableSource(properties: JMap[String, String]): BatchTableSource[Row] = {
+ getCollectionSource(properties, isStreaming = false)
+ }
+
+ override def createBatchTableSink(properties: JMap[String, String]): BatchTableSink[Row] = {
+ getCollectionSink(properties)
+ }
+
+ override def requiredContext(): JMap[String, String] = {
+ val context = new util.HashMap[String, String]()
+ context.put(CONNECTOR, "COLLECTION")
+ context
+ }
+
+ override def supportedProperties(): JList[String] = {
+ val supported = new JArrayList[String]()
+ supported.add("*")
+ supported
+ }
+}
+
+object TestCollectionTableFactory {
+ var isStreaming: Boolean = true
+
+ val SOURCE_DATA = new JLinkedList[Row]()
+ val DIM_DATA = new JLinkedList[Row]()
+ val RESULT = new JLinkedList[Row]()
+ private var emitIntervalMS = -1L
+
+ def initData(sourceData: JList[Row],
+ dimData: JList[Row] = List(),
+ emitInterval: Long = -1L): Unit ={
+ SOURCE_DATA.addAll(sourceData)
+ DIM_DATA.addAll(dimData)
+ emitIntervalMS = emitInterval
+ }
+
+ def reset(): Unit ={
+ RESULT.clear()
+ SOURCE_DATA.clear()
+ DIM_DATA.clear()
+ emitIntervalMS = -1L
+ }
+
+ def getCollectionSource(props: JMap[String, String],
+ isStreaming: Boolean): CollectionTableSource = {
+ val properties = new DescriptorProperties()
+ properties.putProperties(props)
+ val schema = properties.getTableSchema(Schema.SCHEMA)
+ new CollectionTableSource(emitIntervalMS, schema, isStreaming)
+ }
+
+ def getCollectionSink(props: JMap[String, String]): CollectionTableSink = {
+ val properties = new DescriptorProperties()
+ properties.putProperties(props)
+ val schema = properties.getTableSchema(Schema.SCHEMA)
+ new CollectionTableSink(schema.toRowType.asInstanceOf[RowTypeInfo])
+ }
+
+ /**
+ * Table source of collection.
+ */
+ class CollectionTableSource(
+ val emitIntervalMs: Long,
+ val schema: TableSchema,
+ val isStreaming: Boolean)
+ extends BatchTableSource[Row]
+ with StreamTableSource[Row]
+ with LookupableTableSource[Row] {
+
+ private val rowType: TypeInformation[Row] = schema.toRowType
+
+ override def isBounded: Boolean = !isStreaming
+
+ def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+ execEnv.createInput(new TestCollectionInputFormat[Row](emitIntervalMs,
+ SOURCE_DATA,
+ rowType.createSerializer(new ExecutionConfig)),
+ rowType)
+ }
+
+ override def getDataStream(streamEnv: StreamExecutionEnvironment): DataStreamSource[Row] = {
+ streamEnv.createInput(new TestCollectionInputFormat[Row](emitIntervalMs,
+ SOURCE_DATA,
+ rowType.createSerializer(new ExecutionConfig)),
+ rowType)
+ }
+
+ override def getReturnType: TypeInformation[Row] = rowType
+
+ override def getTableSchema: TableSchema = {
+ schema
+ }
+
+ override def getLookupFunction(lookupKeys: Array[String]): TemporalTableFetcher = {
+ new TemporalTableFetcher(DIM_DATA, lookupKeys.map(schema.getFieldNames.indexOf(_)))
+ }
+
+ override def getAsyncLookupFunction(lookupKeys: Array[String]): AsyncTableFunction[Row] = null
+
+ override def isAsyncEnabled: Boolean = false
+ }
+
+ /**
+ * Table sink of collection.
+ */
+ class CollectionTableSink(val outputType: RowTypeInfo)
+ extends BatchTableSink[Row]
+ with AppendStreamTableSink[Row] {
+ override def emitDataSet(dataSet: DataSet[Row]): Unit = {
+ dataSet.output(new LocalCollectionOutputFormat[Row](RESULT)).setParallelism(1)
+ }
+
+ override def getOutputType: RowTypeInfo = outputType
+
+ override def getFieldNames: Array[String] = outputType.getFieldNames
+
+ override def getFieldTypes: Array[TypeInformation[_]] = {
+ outputType.getFieldTypes
+ }
+
+ override def emitDataStream(dataStream: DataStream[Row]): Unit = {
+ dataStream.addSink(new UnsafeMemorySinkFunction(outputType)).setParallelism(1)
+ }
+
+ override def configure(fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = this
+ }
+
+ /**
+ * Sink function of unsafe memory.
+ */
+ class UnsafeMemorySinkFunction(outputType: TypeInformation[Row]) extends RichSinkFunction[Row] {
+ private var serializer: TypeSerializer[Row] = _
+
+ override def open(param: Configuration): Unit = {
+ serializer = outputType.createSerializer(new ExecutionConfig)
+ }
+
+ @throws[Exception]
+ override def invoke(row: Row): Unit = {
+ RESULT.add(serializer.copy(row))
+ }
+ }
+
+ /**
+ * Collection inputFormat for testing.
+ */
+ class TestCollectionInputFormat[T](
+ val emitIntervalMs: Long,
+ val dataSet: java.util.Collection[T],
+ val serializer: TypeSerializer[T])
+ extends CollectionInputFormat[T](dataSet, serializer) {
+ @throws[IOException]
+ override def reachedEnd: Boolean = {
+ if (emitIntervalMs > 0) {
+ try
+ Thread.sleep(emitIntervalMs)
+ catch {
+ case _: InterruptedException =>
+ }
+ }
+ super.reachedEnd
+ }
+ }
+
+ /**
+ * Dimension table source fetcher.
+ */
+ class TemporalTableFetcher(
+ val dimData: JLinkedList[Row],
+ val keys: Array[Int]) extends TableFunction[Row] {
+
+ @throws[Exception]
+ def eval(values: Any*): Unit = {
+ for (data <- dimData) {
+ var matched = true
+ var idx = 0
+ while (matched && idx < keys.length) {
+ val dimField = data.getField(keys(idx))
+ val inputField = values(idx)
+ matched = dimField.equals(inputField)
+ idx += 1
+ }
+ if (matched) {
+ // copy the row data
+ val ret = new Row(data.getArity)
+ 0 until data.getArity foreach { idx =>
+ ret.setField(idx, data.getField(idx))
+ }
+ collect(ret)
+ }
+ }
+ }
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
index cb4567f..af72842 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/MockTableEnvironment.scala
@@ -96,4 +96,6 @@ class MockTableEnvironment extends TableEnvironment {
sinkPathContinued: String*): Unit = ???
override def execute(jobName: String): JobExecutionResult = ???
+
+ override def sql(statement: String): Table = ???
}