You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/12 05:25:53 UTC

[flink] 01/02: [FLINK-13211][table-planner] Add drop table support for flink planner

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 58280f44f5ff62d129580bb091671a52537536e3
Author: yuzhao.cyz <yu...@alibaba-inc.com>
AuthorDate: Thu Jul 11 19:34:16 2019 +0800

    [FLINK-13211][table-planner] Add drop table support for flink planner
---
 .../table/api/internal/TableEnvironmentImpl.java   | 23 ++++-
 .../flink/table/operations/ddl/DropOperation.java  | 30 +++++++
 .../table/operations/ddl/DropTableOperation.java   | 59 +++++++++++++
 .../table/sqlexec/SqlToOperationConverter.java     | 11 ++-
 .../flink/table/api/internal/TableEnvImpl.scala    | 20 ++++-
 .../flink/table/catalog/CatalogTableITCase.scala   | 97 ++++++++++++++++++++++
 6 files changed, 236 insertions(+), 4 deletions(-)

diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 9f98d2b..988fa88 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -40,6 +40,7 @@ import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Planner;
@@ -57,6 +58,7 @@ import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.operations.TableSourceQueryOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.operations.utils.OperationTreeBuilder;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.sources.TableSource;
@@ -337,7 +339,7 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		if (operations.size() != 1) {
 			throw new TableException(
 				"Unsupported SQL query! sqlUpdate() only accepts a single SQL statement of type " +
-					"INSERT or CREATE TABLE");
+					"INSERT, CREATE TABLE, DROP TABLE");
 		}
 
 		Operation operation = operations.get(0);
@@ -355,9 +357,26 @@ public class TableEnvironmentImpl implements TableEnvironment {
 				createTableOperation.getTablePath(),
 				createTableOperation.getCatalogTable(),
 				createTableOperation.isIgnoreIfExists());
+		} else if (operation instanceof DropTableOperation) {
+			String[] name = ((DropTableOperation) operation).getTableName();
+			boolean isIfExists = ((DropTableOperation) operation).isIfExists();
+			String[] paths = catalogManager.getFullTablePath(Arrays.asList(name));
+			Optional<Catalog> catalog = getCatalog(paths[0]);
+			if (!catalog.isPresent()) {
+				if (!isIfExists) {
+					throw new TableException("Catalog " + paths[0] + " does not exist.");
+				}
+			} else {
+				try {
+					catalog.get().dropTable(new ObjectPath(paths[1], paths[2]), isIfExists);
+				} catch (TableNotExistException e) {
+					throw new TableException(e.getMessage());
+				}
+			}
 		} else {
 			throw new TableException(
-				"Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of type INSERT.");
+				"Unsupported SQL query! sqlUpdate() only accepts a single SQL statements of " +
+					"type INSERT, CREATE TABLE, DROP TABLE");
 		}
 	}
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropOperation.java
new file mode 100644
index 0000000..a338e6a
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropOperation.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.operations.Operation;
+
+/**
+ * A {@link Operation} that describes the DROP DDL statements, e.g. DROP TABLE or DROP DATABASE.
+ *
+ * <p>Different sub operations can have their special target name. For example, a drop table
+ * operation may have a target table name and a flag to describe if is exists.
+ */
+public interface DropOperation extends Operation {
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
new file mode 100644
index 0000000..e173e02
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ddl/DropTableOperation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.ddl;
+
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.OperationUtils;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * Operation to describe a DROP TABLE statement.
+ */
+public class DropTableOperation implements DropOperation {
+	private final String[] tableName;
+	private final boolean ifExists;
+
+	public DropTableOperation(String[] tableName, boolean ifExists) {
+		this.tableName = tableName;
+		this.ifExists = ifExists;
+	}
+
+	public String[] getTableName() {
+		return this.tableName;
+	}
+
+	public boolean isIfExists() {
+		return this.ifExists;
+	}
+
+	@Override
+	public String asSummaryString() {
+		Map<String, Object> params = new LinkedHashMap<>();
+		params.put("IfExists", ifExists);
+
+		return OperationUtils.formatWithChildren(
+			"DROP TABLE",
+			params,
+			Collections.emptyList(),
+			Operation::asSummaryString);
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
index 88a9d94..d07cf16 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/sqlexec/SqlToOperationConverter.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.sqlexec;
 
 import org.apache.flink.sql.parser.SqlProperty;
 import org.apache.flink.sql.parser.ddl.SqlCreateTable;
+import org.apache.flink.sql.parser.ddl.SqlDropTable;
 import org.apache.flink.sql.parser.ddl.SqlTableColumn;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
@@ -31,6 +32,7 @@ import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.operations.PlannerQueryOperation;
 import org.apache.flink.table.operations.ddl.CreateTableOperation;
+import org.apache.flink.table.operations.ddl.DropTableOperation;
 import org.apache.flink.table.types.utils.TypeConversions;
 
 import org.apache.calcite.rel.RelRoot;
@@ -68,7 +70,7 @@ public class SqlToOperationConverter {
 
 	/**
 	 * This is the main entrance for executing all kinds of DDL/DML {@code SqlNode}s, different
-	 * SqlNode will have it's implementation in the #execute(type) method whose 'type' argument
+	 * SqlNode will have it's implementation in the #convert(type) method whose 'type' argument
 	 * is subclass of {@code SqlNode}.
 	 *
 	 * @param flinkPlanner     FlinkPlannerImpl to convert sql node to rel node
@@ -80,6 +82,8 @@ public class SqlToOperationConverter {
 		SqlToOperationConverter converter = new SqlToOperationConverter(flinkPlanner);
 		if (validated instanceof SqlCreateTable) {
 			return converter.convert((SqlCreateTable) validated);
+		} else if (validated instanceof SqlDropTable){
+			return converter.convert((SqlDropTable) validated);
 		} else {
 			return converter.convert(validated);
 		}
@@ -130,6 +134,11 @@ public class SqlToOperationConverter {
 			sqlCreateTable.isIfNotExists());
 	}
 
+	/** Convert DROP TABLE statement. */
+	private Operation convert(SqlDropTable sqlDropTable) {
+		return new DropTableOperation(sqlDropTable.fullTableName(), sqlDropTable.getIfExists());
+	}
+
 	/** Fallback method for sql query. */
 	private Operation convert(SqlNode node) {
 		if (node.getKind().belongsTo(SqlKind.QUERY)) {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 75f9fa0..03ef425 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -20,10 +20,11 @@ package org.apache.flink.table.api.internal
 
 import org.apache.flink.annotation.VisibleForTesting
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.sql.parser.ddl.SqlCreateTable
+import org.apache.flink.sql.parser.ddl.{SqlCreateTable, SqlDropTable}
 import org.apache.flink.table.api._
 import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder}
 import org.apache.flink.table.catalog._
+import org.apache.flink.table.catalog.exceptions.TableNotExistException
 import org.apache.flink.table.expressions._
 import org.apache.flink.table.expressions.resolver.lookups.TableReferenceLookup
 import org.apache.flink.table.factories.{TableFactoryService, TableFactoryUtil, TableSinkFactory}
@@ -457,6 +458,23 @@ abstract class TableEnvImpl(
         registerCatalogTableInternal(operation.getTablePath,
           operation.getCatalogTable,
           operation.isIgnoreIfExists)
+      case dropTable: SqlDropTable =>
+        val name = dropTable.fullTableName()
+        val isIfExists = dropTable.getIfExists
+        val paths = catalogManager.getFullTablePath(name.toList)
+        val catalog = getCatalog(paths(0))
+        if (!catalog.isPresent) {
+          if (!isIfExists) {
+            throw new TableException(s"Catalog ${paths(0)} does not exist.")
+          }
+        } else {
+          try
+            catalog.get().dropTable(new ObjectPath(paths(1), paths(2)), isIfExists)
+          catch {
+            case e: TableNotExistException =>
+              throw new TableException(e.getMessage)
+          }
+        }
       case _ =>
         throw new TableException(
           "Unsupported SQL query! sqlUpdate() only accepts SQL statements of type INSERT.")
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
index 28983c5..d8e5ed2 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/catalog/CatalogTableITCase.scala
@@ -440,6 +440,103 @@ class CatalogTableITCase(isStreaming: Boolean) {
     execJob("testJob")
     assertEquals(TestCollectionTableFactory.RESULT.sorted, sourceData.sorted)
   }
+
+  @Test
+  def testDropTableWithFullPath(): Unit = {
+    val ddl1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val ddl2 =
+      """
+        |create table t2(
+        |  a bigint,
+        |  b bigint
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+
+    tableEnv.sqlUpdate(ddl1)
+    tableEnv.sqlUpdate(ddl2)
+    assert(tableEnv.listTables().sameElements(Array[String]("t1", "t2")))
+    tableEnv.sqlUpdate("DROP TABLE default_catalog.default_database.t2")
+    assert(tableEnv.listTables().sameElements(Array("t1")))
+  }
+
+  @Test
+  def testDropTableWithPartialPath(): Unit = {
+    val ddl1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+    val ddl2 =
+      """
+        |create table t2(
+        |  a bigint,
+        |  b bigint
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+
+    tableEnv.sqlUpdate(ddl1)
+    tableEnv.sqlUpdate(ddl2)
+    assert(tableEnv.listTables().sameElements(Array[String]("t1", "t2")))
+    tableEnv.sqlUpdate("DROP TABLE default_database.t2")
+    tableEnv.sqlUpdate("DROP TABLE t1")
+    assert(tableEnv.listTables().isEmpty)
+  }
+
+  @Test(expected = classOf[TableException])
+  def testDropTableWithInvalidPath(): Unit = {
+    val ddl1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+
+    tableEnv.sqlUpdate(ddl1)
+    assert(tableEnv.listTables().sameElements(Array[String]("t1")))
+    tableEnv.sqlUpdate("DROP TABLE catalog1.database1.t1")
+    assert(tableEnv.listTables().isEmpty)
+  }
+
+  @Test
+  def testDropTableWithInvalidPathIfExists(): Unit = {
+    val ddl1 =
+      """
+        |create table t1(
+        |  a bigint,
+        |  b bigint,
+        |  c varchar
+        |) with (
+        |  connector = 'COLLECTION'
+        |)
+      """.stripMargin
+
+    tableEnv.sqlUpdate(ddl1)
+    assert(tableEnv.listTables().sameElements(Array[String]("t1")))
+    tableEnv.sqlUpdate("DROP TABLE IF EXISTS catalog1.database1.t1")
+    assert(tableEnv.listTables().sameElements(Array[String]("t1")))
+  }
 }
 
 object CatalogTableITCase {