You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2020/02/25 23:56:03 UTC
[flink] branch master updated: [FLINK-15349] add 'create catalog'
DDL to blink planner
This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b92803d [FLINK-15349] add 'create catalog' DDL to blink planner
b92803d is described below
commit b92803d3bf7fd0d251f2b70afc70dccb7ff695ec
Author: bowen.li <bo...@gmail.com>
AuthorDate: Mon Feb 17 20:13:23 2020 -0800
[FLINK-15349] add 'create catalog' DDL to blink planner
add 'create catalog' DDL to blink planner
closes #11116.
---
.../flink/table/client/cli/SqlCommandParser.java | 4 +
.../table/client/cli/SqlCommandParserTest.java | 4 +
.../src/main/codegen/data/Parser.tdd | 2 +
.../src/main/codegen/includes/parserImpls.ftl | 24 +++++
.../flink/sql/parser/ddl/SqlCreateCatalog.java | 109 +++++++++++++++++++++
.../flink/sql/parser/FlinkSqlParserImplTest.java | 15 +++
.../table/api/internal/TableEnvironmentImpl.java | 11 +++
.../operations/ddl/CreateCatalogOperation.java | 62 ++++++++++++
.../operations/SqlToOperationConverter.java | 22 +++++
.../flink/table/planner/catalog/CatalogITCase.java | 54 ++++++++++
10 files changed, 307 insertions(+)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index 1387d18..03e2f63 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -111,6 +111,10 @@ public final class SqlCommandParser {
"USE\\s+(?!CATALOG)(.*)",
SINGLE_OPERAND),
+ CREATE_CATALOG(
+ "(CREATE\\s+CATALOG\\s+.*)",
+ SINGLE_OPERAND),
+
DESCRIBE(
"DESCRIBE\\s+(.*)",
SINGLE_OPERAND),
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
index 54e0a7d..2509c3c 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -83,6 +83,10 @@ public class SqlCommandParserTest {
testValidSqlCommand("reset;", new SqlCommandCall(SqlCommand.RESET));
testValidSqlCommand("source /my/file", new SqlCommandCall(SqlCommand.SOURCE, new String[] {"/my/file"}));
testInvalidSqlCommand("source"); // missing path
+ testValidSqlCommand("create CATALOG c1",
+ new SqlCommandCall(SqlCommand.CREATE_CATALOG, new String[]{"create CATALOG c1"}));
+ testValidSqlCommand("create CATALOG c1 WITH ('k'='v')",
+ new SqlCommandCall(SqlCommand.CREATE_CATALOG, new String[]{"create CATALOG c1 WITH ('k'='v')"}));
testValidSqlCommand("USE CATALOG default", new SqlCommandCall(SqlCommand.USE_CATALOG, new String[]{"default"}));
testValidSqlCommand("use default", new SqlCommandCall(SqlCommand.USE, new String[] {"default"}));
testInvalidSqlCommand("use catalog");
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 7f4011a..1783a5e 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -43,6 +43,7 @@
"org.apache.flink.sql.parser.ddl.SqlAlterTable",
"org.apache.flink.sql.parser.ddl.SqlAlterTableRename",
"org.apache.flink.sql.parser.ddl.SqlAlterTableProperties",
+ "org.apache.flink.sql.parser.ddl.SqlCreateCatalog",
"org.apache.flink.sql.parser.dml.RichSqlInsert",
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword",
"org.apache.flink.sql.parser.dql.SqlShowCatalogs",
@@ -453,6 +454,7 @@
# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
# Each must accept arguments "(SqlParserPos pos, boolean replace)".
createStatementParserMethods: [
+ "SqlCreateCatalog",
"SqlCreateTable",
"SqlCreateView",
"SqlCreateDatabase",
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index c8c4395..a93ef66 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -56,6 +56,30 @@ SqlUseCatalog SqlUseCatalog() :
}
/**
+* Parses a create catalog statement.
+* CREATE CATALOG catalog_name [WITH (property_name=property_value, ...)];
+*/
+SqlCreate SqlCreateCatalog(Span s, boolean replace) :
+{
+ SqlParserPos startPos;
+ SqlIdentifier catalogName;
+ SqlNodeList propertyList = SqlNodeList.EMPTY;
+}
+{
+ <CATALOG> { startPos = getPos(); }
+ catalogName = SimpleIdentifier()
+ [
+ <WITH>
+ propertyList = TableProperties()
+ ]
+ {
+ return new SqlCreateCatalog(startPos.plus(getPos()),
+ catalogName,
+ propertyList);
+ }
+}
+
+/**
* Parse a "Show Catalogs" metadata query command.
*/
SqlShowDatabases SqlShowDatabases() :
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java
new file mode 100644
index 0000000..94e69d4
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+
+/**
+ * CREATE CATALOG DDL sql call.
+ */
+public class SqlCreateCatalog extends SqlCreate implements ExtendedSqlNode {
+
+ public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE CATALOG", SqlKind.OTHER_DDL);
+
+ private final SqlIdentifier catalogName;
+
+ private final SqlNodeList propertyList;
+
+ public SqlCreateCatalog(
+ SqlParserPos position,
+ SqlIdentifier catalogName,
+ SqlNodeList propertyList) {
+ super(OPERATOR, position, false, false);
+ this.catalogName = requireNonNull(catalogName, "catalogName cannot be null");
+ this.propertyList = requireNonNull(propertyList, "propertyList cannot be null");
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(catalogName, propertyList);
+ }
+
+ public SqlIdentifier getCatalogName() {
+ return catalogName;
+ }
+
+ public SqlNodeList getPropertyList() {
+ return propertyList;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("CREATE CATALOG");
+ catalogName.unparse(writer, leftPrec, rightPrec);
+
+ if (this.propertyList.size() > 0) {
+ writer.keyword("WITH");
+ SqlWriter.Frame withFrame = writer.startList("(", ")");
+ for (SqlNode property : propertyList) {
+ printIndent(writer);
+ property.unparse(writer, leftPrec, rightPrec);
+ }
+ writer.newlineAndIndent();
+ writer.endList(withFrame);
+ }
+ }
+
+ private void printIndent(SqlWriter writer) {
+ writer.sep(",", false);
+ writer.newlineAndIndent();
+ writer.print(" ");
+ }
+
+ public String catalogName() {
+ return catalogName.names.get(0);
+ }
+
+ @Override
+ public void validate() throws SqlValidateException {
+
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 6bb594c..3057324 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -96,6 +96,21 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
}
@Test
+ public void testCreateCatalog() {
+ check(
+ "create catalog c1\n" +
+ " WITH (\n" +
+ " 'key1'='value1',\n" +
+ " 'key2'='value2'\n" +
+ " )\n",
+ "CREATE CATALOG `C1` " +
+ "WITH (\n" +
+ " 'key1' = 'value1',\n" +
+ " 'key2' = 'value2'\n" +
+ ")");
+ }
+
+ @Test
public void testShowDataBases() {
check("show databases", "SHOW DATABASES");
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 7f8706e..9e0b7ac 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -83,6 +84,7 @@ import org.apache.flink.table.operations.ddl.AlterTableOperation;
import org.apache.flink.table.operations.ddl.AlterTablePropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
@@ -663,6 +665,15 @@ public class TableEnvironmentImpl implements TableEnvironment {
DropTempSystemFunctionOperation dropTempSystemFunctionOperation =
(DropTempSystemFunctionOperation) operation;
dropSystemFunction(dropTempSystemFunctionOperation);
+ } else if (operation instanceof CreateCatalogOperation) {
+ CreateCatalogOperation createCatalogOperation = (CreateCatalogOperation) operation;
+ String exMsg = getDDLOpExecuteErrorMsg(createCatalogOperation.asSummaryString());
+ try {
+ catalogManager.registerCatalog(
+ createCatalogOperation.getCatalogName(), createCatalogOperation.getCatalog());
+ } catch (CatalogException e) {
+ throw new ValidationException(exMsg, e);
+ }
} else if (operation instanceof UseCatalogOperation) {
UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation;
catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName());
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
new file mode 100644
index 0000000..1d04a20
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Operation to describe a CREATE CATALOG statement.
+ */
+public class CreateCatalogOperation implements CreateOperation {
+ private final String catalogName;
+ private final Catalog catalog;
+
+ public CreateCatalogOperation(String catalogName, Catalog catalog) {
+ this.catalogName = checkNotNull(catalogName);
+ this.catalog = checkNotNull(catalog);
+ }
+
+ public String getCatalogName() {
+ return catalogName;
+ }
+
+ public Catalog getCatalog() {
+ return catalog;
+ }
+
+ @Override
+ public String asSummaryString() {
+ Map<String, Object> params = new LinkedHashMap<>();
+ params.put("catalogName", catalogName);
+
+ return OperationUtils.formatWithChildren(
+ "CREATE CATALOG",
+ params,
+ Collections.emptyList(),
+ Operation::asSummaryString);
+ }
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 6b1c5e7..6e7c96c 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -23,6 +23,7 @@ import org.apache.flink.sql.parser.ddl.SqlAlterFunction;
import org.apache.flink.sql.parser.ddl.SqlAlterTable;
import org.apache.flink.sql.parser.ddl.SqlAlterTableProperties;
import org.apache.flink.sql.parser.ddl.SqlAlterTableRename;
+import org.apache.flink.sql.parser.ddl.SqlCreateCatalog;
import org.apache.flink.sql.parser.ddl.SqlCreateDatabase;
import org.apache.flink.sql.parser.ddl.SqlCreateFunction;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
@@ -49,6 +50,8 @@ import org.apache.flink.table.catalog.FunctionLanguage;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.UseCatalogOperation;
@@ -58,6 +61,7 @@ import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
import org.apache.flink.table.operations.ddl.AlterTablePropertiesOperation;
import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
@@ -150,6 +154,8 @@ public class SqlToOperationConverter {
return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated));
} else if (validated instanceof SqlAlterDatabase) {
return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated));
+ } else if (validated instanceof SqlCreateCatalog) {
+ return Optional.of(converter.convertCreateCatalog((SqlCreateCatalog) validated));
} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
return Optional.of(converter.convertSqlQuery(validated));
} else {
@@ -358,6 +364,22 @@ public class SqlToOperationConverter {
return new UseCatalogOperation(useCatalog.getCatalogName());
}
+ /** Convert CREATE CATALOG statement. */
+ private Operation convertCreateCatalog(SqlCreateCatalog sqlCreateCatalog) {
+ String catalogName = sqlCreateCatalog.catalogName();
+
+ // set with properties
+ Map<String, String> properties = new HashMap<>();
+ sqlCreateCatalog.getPropertyList().getList().forEach(p ->
+ properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString()));
+
+ final CatalogFactory factory =
+ TableFactoryService.find(CatalogFactory.class, properties, this.getClass().getClassLoader());
+
+ Catalog catalog = factory.createCatalog(catalogName, properties);
+ return new CreateCatalogOperation(catalogName, catalog);
+ }
+
/** Convert use database statement. */
private Operation convertUseDatabase(SqlUseDatabase useDatabase) {
String[] fullDatabaseName = useDatabase.fullDatabaseName();
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
new file mode 100644
index 0000000..dae1666
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.catalog;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+
+import org.junit.Test;
+
+import static org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * IT Case for catalog ddl.
+ */
+public class CatalogITCase {
+
+ @Test
+ public void testCreateCatalog() {
+ String name = "c1";
+ TableEnvironment tableEnv = getTableEnvironment();
+ String ddl = String.format("create catalog %s with('type'='%s')", name, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY);
+
+ tableEnv.sqlUpdate(ddl);
+
+ assertTrue(tableEnv.getCatalog(name).isPresent());
+ assertTrue(tableEnv.getCatalog(name).get() instanceof GenericInMemoryCatalog);
+ }
+
+ private TableEnvironment getTableEnvironment() {
+ EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ return StreamTableEnvironment.create(env, settings);
+ }
+}