You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by bl...@apache.org on 2020/02/25 23:56:03 UTC

[flink] branch master updated: [FLINK-15349] add 'create catalog' DDL to blink planner

This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b92803d  [FLINK-15349] add 'create catalog' DDL to blink planner
b92803d is described below

commit b92803d3bf7fd0d251f2b70afc70dccb7ff695ec
Author: bowen.li <bo...@gmail.com>
AuthorDate: Mon Feb 17 20:13:23 2020 -0800

    [FLINK-15349] add 'create catalog' DDL to blink planner
    
    add 'create catalog' DDL to blink planner
    
    closes #11116.
---
 .../flink/table/client/cli/SqlCommandParser.java   |   4 +
 .../table/client/cli/SqlCommandParserTest.java     |   4 +
 .../src/main/codegen/data/Parser.tdd               |   2 +
 .../src/main/codegen/includes/parserImpls.ftl      |  24 +++++
 .../flink/sql/parser/ddl/SqlCreateCatalog.java     | 109 +++++++++++++++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  15 +++
 .../table/api/internal/TableEnvironmentImpl.java   |  11 +++
 .../operations/ddl/CreateCatalogOperation.java     |  62 ++++++++++++
 .../operations/SqlToOperationConverter.java        |  22 +++++
 .../flink/table/planner/catalog/CatalogITCase.java |  54 ++++++++++
 10 files changed, 307 insertions(+)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
index 1387d18..03e2f63 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/SqlCommandParser.java
@@ -111,6 +111,10 @@ public final class SqlCommandParser {
 			"USE\\s+(?!CATALOG)(.*)",
 			SINGLE_OPERAND),
 
+		CREATE_CATALOG(
+			"(CREATE\\s+CATALOG\\s+.*)",
+			SINGLE_OPERAND),
+
 		DESCRIBE(
 			"DESCRIBE\\s+(.*)",
 			SINGLE_OPERAND),
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
index 54e0a7d..2509c3c 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/SqlCommandParserTest.java
@@ -83,6 +83,10 @@ public class SqlCommandParserTest {
 		testValidSqlCommand("reset;", new SqlCommandCall(SqlCommand.RESET));
 		testValidSqlCommand("source /my/file", new SqlCommandCall(SqlCommand.SOURCE, new String[] {"/my/file"}));
 		testInvalidSqlCommand("source"); // missing path
+		testValidSqlCommand("create CATALOG c1",
+			new SqlCommandCall(SqlCommand.CREATE_CATALOG, new String[]{"create CATALOG c1"}));
+		testValidSqlCommand("create CATALOG c1 WITH ('k'='v')",
+			new SqlCommandCall(SqlCommand.CREATE_CATALOG, new String[]{"create CATALOG c1 WITH ('k'='v')"}));
 		testValidSqlCommand("USE CATALOG default", new SqlCommandCall(SqlCommand.USE_CATALOG, new String[]{"default"}));
 		testValidSqlCommand("use default", new SqlCommandCall(SqlCommand.USE, new String[] {"default"}));
 		testInvalidSqlCommand("use catalog");
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 7f4011a..1783a5e 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -43,6 +43,7 @@
     "org.apache.flink.sql.parser.ddl.SqlAlterTable",
     "org.apache.flink.sql.parser.ddl.SqlAlterTableRename",
     "org.apache.flink.sql.parser.ddl.SqlAlterTableProperties",
+    "org.apache.flink.sql.parser.ddl.SqlCreateCatalog",
     "org.apache.flink.sql.parser.dml.RichSqlInsert",
     "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword",
     "org.apache.flink.sql.parser.dql.SqlShowCatalogs",
@@ -453,6 +454,7 @@
   # List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
   # Each must accept arguments "(SqlParserPos pos, boolean replace)".
   createStatementParserMethods: [
+    "SqlCreateCatalog",
     "SqlCreateTable",
     "SqlCreateView",
     "SqlCreateDatabase",
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index c8c4395..a93ef66 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -56,6 +56,30 @@ SqlUseCatalog SqlUseCatalog() :
 }
 
 /**
+* Parses a create catalog statement.
+* CREATE CATALOG catalog_name [WITH (property_name=property_value, ...)];
+*/
+SqlCreate SqlCreateCatalog(Span s, boolean replace) :
+{
+    SqlParserPos startPos;
+    SqlIdentifier catalogName;
+    SqlNodeList propertyList = SqlNodeList.EMPTY;
+}
+{
+    <CATALOG> { startPos = getPos(); }
+    catalogName = SimpleIdentifier()
+    [
+        <WITH>
+        propertyList = TableProperties()
+    ]
+    {
+        return new SqlCreateCatalog(startPos.plus(getPos()),
+            catalogName,
+            propertyList);
+    }
+}
+
+/**
 * Parse a "Show Catalogs" metadata query command.
 */
 SqlShowDatabases SqlShowDatabases() :
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java
new file mode 100644
index 0000000..94e69d4
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateCatalog.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCreate;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+
+/**
+ * CREATE CATALOG DDL sql call.
+ */
+public class SqlCreateCatalog extends SqlCreate implements ExtendedSqlNode {
+
+	public static final SqlSpecialOperator OPERATOR = new SqlSpecialOperator("CREATE CATALOG", SqlKind.OTHER_DDL);
+
+	private final SqlIdentifier catalogName;
+
+	private final SqlNodeList propertyList;
+
+	public SqlCreateCatalog(
+			SqlParserPos position,
+			SqlIdentifier catalogName,
+			SqlNodeList propertyList) {
+		super(OPERATOR, position, false, false);
+		this.catalogName = requireNonNull(catalogName, "catalogName cannot be null");
+		this.propertyList = requireNonNull(propertyList, "propertyList cannot be null");
+	}
+
+	@Override
+	public SqlOperator getOperator() {
+		return OPERATOR;
+	}
+
+	@Override
+	public List<SqlNode> getOperandList() {
+		return ImmutableNullableList.of(catalogName, propertyList);
+	}
+
+	public SqlIdentifier getCatalogName() {
+		return catalogName;
+	}
+
+	public SqlNodeList getPropertyList() {
+		return propertyList;
+	}
+
+	@Override
+	public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+		writer.keyword("CREATE CATALOG");
+		catalogName.unparse(writer, leftPrec, rightPrec);
+
+		if (this.propertyList.size() > 0) {
+			writer.keyword("WITH");
+			SqlWriter.Frame withFrame = writer.startList("(", ")");
+			for (SqlNode property : propertyList) {
+				printIndent(writer);
+				property.unparse(writer, leftPrec, rightPrec);
+			}
+			writer.newlineAndIndent();
+			writer.endList(withFrame);
+		}
+	}
+
+	private void printIndent(SqlWriter writer) {
+		writer.sep(",", false);
+		writer.newlineAndIndent();
+		writer.print("  ");
+	}
+
+	public String catalogName() {
+		return catalogName.names.get(0);
+	}
+
+	@Override
+	public void validate() throws SqlValidateException {
+
+	}
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 6bb594c..3057324 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -96,6 +96,21 @@ public class FlinkSqlParserImplTest extends SqlParserTest {
 	}
 
 	@Test
+	public void testCreateCatalog() {
+		check(
+			"create catalog c1\n" +
+				" WITH (\n" +
+				"  'key1'='value1',\n" +
+				"  'key2'='value2'\n" +
+				" )\n",
+			"CREATE CATALOG `C1` " +
+				"WITH (\n" +
+				"  'key1' = 'value1',\n" +
+				"  'key2' = 'value2'\n" +
+				")");
+	}
+
+	@Test
 	public void testShowDataBases() {
 		check("show databases", "SHOW DATABASES");
 	}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 7f8706e..9e0b7ac 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
@@ -83,6 +84,7 @@ import org.apache.flink.table.operations.ddl.AlterTableOperation;
 import org.apache.flink.table.operations.ddl.AlterTablePropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
 import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
@@ -663,6 +665,15 @@ public class TableEnvironmentImpl implements TableEnvironment {
 			DropTempSystemFunctionOperation dropTempSystemFunctionOperation =
 				(DropTempSystemFunctionOperation) operation;
 			dropSystemFunction(dropTempSystemFunctionOperation);
+		} else if (operation instanceof CreateCatalogOperation) {
+			CreateCatalogOperation createCatalogOperation = (CreateCatalogOperation) operation;
+			String exMsg = getDDLOpExecuteErrorMsg(createCatalogOperation.asSummaryString());
+			try {
+				catalogManager.registerCatalog(
+					createCatalogOperation.getCatalogName(), createCatalogOperation.getCatalog());
+			} catch (CatalogException e) {
+				throw new ValidationException(exMsg, e);
+			}
 		} else if (operation instanceof UseCatalogOperation) {
 			UseCatalogOperation useCatalogOperation = (UseCatalogOperation) operation;
 			catalogManager.setCurrentCatalog(useCatalogOperation.getCatalogName());
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
new file mode 100644
index 0000000..1d04a20
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/CreateCatalogOperation.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Operation to describe a CREATE CATALOG statement.
+ */
+public class CreateCatalogOperation implements CreateOperation {
+	private final String catalogName;
+	private final Catalog catalog;
+
+	public CreateCatalogOperation(String catalogName, Catalog catalog) {
+		this.catalogName = checkNotNull(catalogName);
+		this.catalog = checkNotNull(catalog);
+	}
+
+	public String getCatalogName() {
+		return catalogName;
+	}
+
+	public Catalog getCatalog() {
+		return catalog;
+	}
+
+	@Override
+	public String asSummaryString() {
+		Map<String, Object> params = new LinkedHashMap<>();
+		params.put("catalogName", catalogName);
+
+		return OperationUtils.formatWithChildren(
+			"CREATE CATALOG",
+			params,
+			Collections.emptyList(),
+			Operation::asSummaryString);
+	}
+}
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 6b1c5e7..6e7c96c 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -23,6 +23,7 @@ import org.apache.flink.sql.parser.ddl.SqlAlterFunction;
 import org.apache.flink.sql.parser.ddl.SqlAlterTable;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableProperties;
 import org.apache.flink.sql.parser.ddl.SqlAlterTableRename;
+import org.apache.flink.sql.parser.ddl.SqlCreateCatalog;
 import org.apache.flink.sql.parser.ddl.SqlCreateDatabase;
 import org.apache.flink.sql.parser.ddl.SqlCreateFunction;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
@@ -49,6 +50,8 @@ import org.apache.flink.table.catalog.FunctionLanguage;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.factories.CatalogFactory;
+import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.UseCatalogOperation;
@@ -58,6 +61,7 @@ import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
 import org.apache.flink.table.operations.ddl.AlterTablePropertiesOperation;
 import org.apache.flink.table.operations.ddl.AlterTableRenameOperation;
 import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
 import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
 import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
@@ -150,6 +154,8 @@ public class SqlToOperationConverter {
 			return Optional.of(converter.convertDropDatabase((SqlDropDatabase) validated));
 		} else if (validated instanceof SqlAlterDatabase) {
 			return Optional.of(converter.convertAlterDatabase((SqlAlterDatabase) validated));
+		} else if (validated instanceof SqlCreateCatalog) {
+			return Optional.of(converter.convertCreateCatalog((SqlCreateCatalog) validated));
 		} else if (validated.getKind().belongsTo(SqlKind.QUERY)) {
 			return Optional.of(converter.convertSqlQuery(validated));
 		} else {
@@ -358,6 +364,22 @@ public class SqlToOperationConverter {
 		return new UseCatalogOperation(useCatalog.getCatalogName());
 	}
 
+	/** Convert CREATE CATALOG statement. */
+	private Operation convertCreateCatalog(SqlCreateCatalog sqlCreateCatalog) {
+		String catalogName = sqlCreateCatalog.catalogName();
+
+		// set with properties
+		Map<String, String> properties = new HashMap<>();
+		sqlCreateCatalog.getPropertyList().getList().forEach(p ->
+			properties.put(((SqlTableOption) p).getKeyString(), ((SqlTableOption) p).getValueString()));
+
+		final CatalogFactory factory =
+			TableFactoryService.find(CatalogFactory.class, properties, this.getClass().getClassLoader());
+
+		Catalog catalog = factory.createCatalog(catalogName, properties);
+		return new CreateCatalogOperation(catalogName, catalog);
+	}
+
 	/** Convert use database statement. */
 	private Operation convertUseDatabase(SqlUseDatabase useDatabase) {
 		String[] fullDatabaseName = useDatabase.fullDatabaseName();
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
new file mode 100644
index 0000000..dae1666
--- /dev/null
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.catalog;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+
+import org.junit.Test;
+
+import static org.apache.flink.table.descriptors.GenericInMemoryCatalogValidator.CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * IT Case for catalog ddl.
+ */
+public class CatalogITCase {
+
+	@Test
+	public void testCreateCatalog() {
+		String name = "c1";
+		TableEnvironment tableEnv = getTableEnvironment();
+		String ddl = String.format("create catalog %s with('type'='%s')", name, CATALOG_TYPE_VALUE_GENERIC_IN_MEMORY);
+
+		tableEnv.sqlUpdate(ddl);
+
+		assertTrue(tableEnv.getCatalog(name).isPresent());
+		assertTrue(tableEnv.getCatalog(name).get() instanceof GenericInMemoryCatalog);
+	}
+
+	private TableEnvironment getTableEnvironment() {
+		EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		return StreamTableEnvironment.create(env, settings);
+	}
+}