You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/23 02:56:17 UTC

[spark] branch branch-3.1 updated: [SPARK-33364][SQL][FOLLOWUP] Refine the catalog v2 API to purge a table

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 60a1e59  [SPARK-33364][SQL][FOLLOWUP] Refine the catalog v2 API to purge a table
60a1e59 is described below

commit 60a1e59cec146b02bb19ad2542037fb0fa276afc
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Dec 23 11:47:13 2020 +0900

    [SPARK-33364][SQL][FOLLOWUP] Refine the catalog v2 API to purge a table
    
    This is a followup of https://github.com/apache/spark/pull/30267
    
    Inspired by https://github.com/apache/spark/pull/30886, it's better to have 2 methods `def dropTable` and `def purgeTable`, than `def dropTable(ident)` and `def dropTable(ident, purge)`.
    
    1. make the APIs orthogonal. Previously, `def dropTable(ident, purge)` calls `def dropTable(ident)` and is a superset.
    2. simplifies the catalog implementation a little bit. Now the `if (purge) ... else ...` check is done at the Spark side.
    
    No.
    
    existing tests
    
    Closes #30890 from cloud-fan/purgeTable.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
    (cherry picked from commit ec1560af251d2c3580f5bccfabc750f1c7af09df)
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../catalog/DelegatingCatalogExtension.java        |  5 +++
 .../spark/sql/connector/catalog/TableCatalog.java  | 17 +++----
 .../sql/connector/catalog/TableCatalogSuite.scala  |  5 +++
 .../execution/datasources/v2/DropTableExec.scala   |  2 +-
 .../sql/execution/command/v2/DropTableSuite.scala  | 52 ++++++++++++++++++++++
 5 files changed, 70 insertions(+), 11 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
index d07d299..34f07b1 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
@@ -100,6 +100,11 @@ public abstract class DelegatingCatalogExtension implements CatalogExtension {
   }
 
   @Override
+  public boolean purgeTable(Identifier ident) {
+    return asTableCatalog().purgeTable(ident);
+  }
+
+  @Override
   public void renameTable(
       Identifier oldIdent,
       Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException {
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index 52a74ab..4163d86 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -173,26 +173,23 @@ public interface TableCatalog extends CatalogPlugin {
   boolean dropTable(Identifier ident);
 
   /**
-   * Drop a table in the catalog with an option to purge.
+   * Drop a table in the catalog and completely remove its data by skipping a trash even if it is
+   * supported.
    * <p>
    * If the catalog supports views and contains a view for the identifier and not a table, this
    * must not drop the view and must return false.
    * <p>
-   * If the catalog supports the option to purge a table, this method must be overridden.
-   * The default implementation falls back to {@link #dropTable(Identifier)} dropTable} if the
-   * purge option is set to false. Otherwise, it throws {@link UnsupportedOperationException}.
+   * If the catalog supports to purge a table, this method should be overridden.
+   * The default implementation throws {@link UnsupportedOperationException}.
    *
    * @param ident a table identifier
-   * @param purge whether a table should be purged
    * @return true if a table was deleted, false if no table exists for the identifier
+   * @throws UnsupportedOperationException If table purging is not supported
    *
    * @since 3.1.0
    */
-  default boolean dropTable(Identifier ident, boolean purge) {
-    if (purge) {
-      throw new UnsupportedOperationException("Purge option is not supported.");
-    }
-    return dropTable(ident);
+  default boolean purgeTable(Identifier ident) throws UnsupportedOperationException {
+    throw new UnsupportedOperationException("Purge table is not supported.");
   }
 
   /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
index dab2091..ef342e7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
@@ -643,6 +643,11 @@ class TableCatalogSuite extends SparkFunSuite {
     assert(!catalog.tableExists(testIdent))
   }
 
+  test("purgeTable") {
+    val catalog = newCatalog()
+    intercept[UnsupportedOperationException](catalog.purgeTable(testIdent))
+  }
+
   test("renameTable") {
     val catalog = newCatalog()
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
index f89b890..100eaf9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
@@ -35,7 +35,7 @@ case class DropTableExec(
   override def run(): Seq[InternalRow] = {
     if (catalog.tableExists(ident)) {
       invalidateCache()
-      catalog.dropTable(ident, purge)
+      if (purge) catalog.purgeTable(ident) else catalog.dropTable(ident)
     } else if (!ifExists) {
       throw new NoSuchTableException(ident)
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropTableSuite.scala
new file mode 100644
index 0000000..a272f64
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropTableSuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.connector.InMemoryTableSessionCatalog
+import org.apache.spark.sql.execution.command
+import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
+
+class DropTableSuite extends command.DropTableSuiteBase with CommandSuiteBase {
+  test("purge option") {
+    withNamespaceAndTable("ns", "tbl") { t =>
+      createTable(t)
+      val errMsg = intercept[UnsupportedOperationException] {
+        sql(s"DROP TABLE $catalog.ns.tbl PURGE")
+      }.getMessage
+      // The default TableCatalog.purgeTable implementation throws an exception.
+      assert(errMsg.contains("Purge table is not supported"))
+    }
+  }
+
+  test("table qualified with the session catalog name") {
+    withSQLConf(
+      V2_SESSION_CATALOG_IMPLEMENTATION.key -> classOf[InMemoryTableSessionCatalog].getName) {
+
+      sql("CREATE TABLE tbl USING json AS SELECT 1 AS i")
+      checkAnswer(
+        sql("SHOW TABLES IN spark_catalog.default").select("tableName"),
+        Row("tbl"))
+
+      sql("DROP TABLE spark_catalog.default.tbl")
+      checkAnswer(
+        sql("SHOW TABLES IN spark_catalog.default").select("tableName"),
+        Seq.empty)
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org