You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/12 05:25:53 UTC
[flink] 01/02: [FLINK-13211][table-planner] Add drop table support
for flink planner
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 58280f44f5ff62d129580bb091671a52537536e3
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Thu Jul 11 19:34:16 2019 +0800
[FLINK-13211][table-planner] Add drop table support for flink planner
---
.../table/api/internal/TableEnvironmentImpl.java | 23 ++++-
.../flink/table/operations/ddl/DropOperation.java | 30 +++++++
.../table/operations/ddl/DropTableOperation.java | 59 +++++++++++++
.../table/sqlexec/SqlToOperationConverter.java | 11 ++-
.../flink/table/api/internal/TableEnvImpl.scala | 20 ++++-
.../flink/table/catalog/CatalogTableITCase.scala | 97 ++++++++++++++++++++++
6 files changed, 236 insertions(+), 4 deletions(-)
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 9f98d2b..988fa88 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
@@ -57,6 +58,7 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.TableSourceQueryOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
@@ -337,7 +339,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
if (operations.size() != 1) {
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
- "INSERT or CREATE TABLE");
+ "INSERT, CREATE TABLE, DROP TABLE");
}
Operation operation = operations.get(0);
@@ -355,9 +357,26 @@ public class TableEnvironmentImpl implements TableEnvironment {
createTableOperation.getTablePath(),
createTableOperation.getCatalogTable(),
createTableOperation.isIgnoreIfExists());
+ } else if (operation instanceof DropTableOperation) {
+ String[] name = ((DropTableOperation) operation).getTableName();
+ boolean isIfExists = ((DropTableOperation) operation).isIfExists();
+ String[] paths = catalogManager.getFullTablePath(Arrays.asList(name));
+ Optional<Catalog> catalog = getCatalog(paths[0]);
+ if (!catalog.isPresent()) {
+ if (!isIfExists) {
+ throw new TableException("Catalog " + paths[0] + " does not exist.");
+ }
+ } else {
+ try {
+ catalog.get().dropTable(new ObjectPath(paths[1], paths[2]), isIfExists);
+ } catch (TableNotExistException e) {
+ throw new TableException(e.getMessage());
+ }
+ }
} else {
throw new TableException(
- "Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of type INSERT.");
+ "Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of " +
+ "type INSERT, CREATE TABLE, DROP TABLE");
}
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropOperation.java
new file mode 100644
index 0000000..a338e6a
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropOperation.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.operations.Operation;
+
+/**
+ * A {@link Operation} that describes the DROP DDL statements, e.g. DROP TABLE or DROP DATABASE.
+ *
+ * <p>Different sub operations can have their special target name. For example, a drop table
+ * operation may have a target table name and a flag to describe if is exists.
+ */
+public interface DropOperation extends Operation {
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
new file mode 100644
index 0000000..e173e02
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Operation to describe a DROP TABLE statement.
+ */
+public class DropTableOperation implements DropOperation {
+ private final String[] tableName;
+ private final boolean ifExists;
+
+ public DropTableOperation(String[] tableName, boolean ifExists) {
+ this.tableName = tableName;
+ this.ifExists = ifExists;
+ }
+
+ public String[] getTableName() {
+ return this.tableName;
+ }
+
+ public boolean isIfExists() {
+ return this.ifExists;
+ }
+
+ @Override
+ public String asSummaryString() {
+ Map<String, Object> params = new LinkedHashMap<>();
+ params.put("IfExists", ifExists);
+
+ return OperationUtils.formatWithChildren(
+ "DROP TABLE",
+ params,
+ Collections.emptyList(),
+ Operation::asSummaryString);
+ }
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 88a9d94..d07cf16 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.sqlexec;
import org.apache.flink.sql.parser.SqlProperty;
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlDropTable;
import org.apache.flink.sql.parser.ddl.SqlTableColumn;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
@@ -31,6 +32,7 @@ import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.PlannerQueryOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.operations.ddl.DropTableOperation;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.calcite.rel.RelRoot;
@@ -68,7 +70,7 @@ public class SqlToOperationConverter {
/**
* This is the main entrance for executing all kinds of DDL/DML {@code SqlNode}s, different
- * SqlNode will have it's implementation in the #execute(type) method whose 'type' argument
+ * SqlNode will have it's implementation in the #convert(type) method whose 'type' argument
* is subclass of {@code SqlNode}.
*
* @param flinkPlanner FlinkPlannerImpl to convert sql node to rel node
@@ -80,6 +82,8 @@ public class SqlToOperationConverter {
SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner);
if (validated instanceof SqlCreateTable) {
return converter.convert((SqlCreateTable) validated);
+ } else if (validated instanceof SqlDropTable){
+ return converter.convert((SqlDropTable) validated);
} else {
return converter.convert(validated);
}
@@ -130,6 +134,11 @@ public class SqlToOperationConverter {
sqlCreateTable.isIfNotExists());
}
+ /** Convert DROP TABLE statement. */
+ private Operation convert(SqlDropTable sqlDropTable) {
+ return new DropTableOperation(sqlDropTable.fullTableName(), sqlDropTable.getIfExists());
+ }
+
/** Fallback method for sql query. */
private Operation convert(SqlNode node) {
if (node.getKind().belongsTo(SqlKind.QUERY)) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 75f9fa0..03ef425 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -20,10 +20,11 @@ package org.apache.flink.table.api.internal
import org.apache.flink.annotation.VisibleForTesting
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.sql.parser.ddl.SqlCreateTable
+import org.apache.flink.sql.parser.ddl.{SqlCreateTable, SqlDropTable}
import org.apache.flink.table.api._
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder}
import org.apache.flink.table.catalog._
+import org.apache.flink.table.catalog.exceptions.TableNotExistException
import org.apache.flink.table.expressions._
import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
@@ -457,6 +458,23 @@ abstract class TableEnvImpl(
registerCatalogTableInternal(operation.getTablePath,
operation.getCatalogTable,
operation.isIgnoreIfExists)
+ case dropTable: SqlDropTable =>
+ val name = dropTable.fullTableName()
+ val isIfExists = dropTable.getIfExists
+ val paths = catalogManager.getFullTablePath(name.toList)
+ val catalog = getCatalog(paths(0))
+ if (!catalog.isPresent) {
+ if (!isIfExists) {
+ throw new TableException(s"Catalog ${paths(0)} does not exist.")
+ }
+ } else {
+ try
+ catalog.get().dropTable(new ObjectPath(paths(1), paths(2)), isIfExists)
+ catch {
+ case e: TableNotExistException =>
+ throw new TableException(e.getMessage)
+ }
+ }
case _ =>
throw new TableException(
"Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index 28983c5..d8e5ed2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -440,6 +440,103 @@ class CatalogTableITCase(isStreaming: Boolean) {
execJob("testJob")
assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
}
+
+ @Test
+ def testDropTableWithFullPath(): Unit = {
+ val ddl1 =
+ """
+ |create table t1(
+ | a bigint,
+ | b bigint,
+ | c varchar
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val ddl2 =
+ """
+ |create table t2(
+ | a bigint,
+ | b bigint
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+
+ tableEnv.sqlUpdate(ddl1)
+ tableEnv.sqlUpdate(ddl2)
+ assert(tableEnv.listTables().sameElements(Array[String]("t1", "t2")))
+ tableEnv.sqlUpdate("DROP TABLE default_catalog.default_database.t2")
+ assert(tableEnv.listTables().sameElements(Array("t1")))
+ }
+
+ @Test
+ def testDropTableWithPartialPath(): Unit = {
+ val ddl1 =
+ """
+ |create table t1(
+ | a bigint,
+ | b bigint,
+ | c varchar
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+ val ddl2 =
+ """
+ |create table t2(
+ | a bigint,
+ | b bigint
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+
+ tableEnv.sqlUpdate(ddl1)
+ tableEnv.sqlUpdate(ddl2)
+ assert(tableEnv.listTables().sameElements(Array[String]("t1", "t2")))
+ tableEnv.sqlUpdate("DROP TABLE default_database.t2")
+ tableEnv.sqlUpdate("DROP TABLE t1")
+ assert(tableEnv.listTables().isEmpty)
+ }
+
+ @Test(expected = classOf[TableException])
+ def testDropTableWithInvalidPath(): Unit = {
+ val ddl1 =
+ """
+ |create table t1(
+ | a bigint,
+ | b bigint,
+ | c varchar
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+
+ tableEnv.sqlUpdate(ddl1)
+ assert(tableEnv.listTables().sameElements(Array[String]("t1")))
+ tableEnv.sqlUpdate("DROP TABLE catalog1.database1.t1")
+ assert(tableEnv.listTables().isEmpty)
+ }
+
+ @Test
+ def testDropTableWithInvalidPathIfExists(): Unit = {
+ val ddl1 =
+ """
+ |create table t1(
+ | a bigint,
+ | b bigint,
+ | c varchar
+ |) with (
+ | connector = 'COLLECTION'
+ |)
+ """.stripMargin
+
+ tableEnv.sqlUpdate(ddl1)
+ assert(tableEnv.listTables().sameElements(Array[String]("t1")))
+ tableEnv.sqlUpdate("DROP TABLE IF EXISTS catalog1.database1.t1")
+ assert(tableEnv.listTables().sameElements(Array[String]("t1")))
+ }
}
object CatalogTableITCase {