You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/12/23 02:56:17 UTC
[spark] branch branch-3.1 updated: [SPARK-33364][SQL][FOLLOWUP]
Refine the catalog v2 API to purge a table
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 60a1e59 [SPARK-33364][SQL][FOLLOWUP] Refine the catalog v2 API to purge a table
60a1e59 is described below
commit 60a1e59cec146b02bb19ad2542037fb0fa276afc
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Dec 23 11:47:13 2020 +0900
[SPARK-33364][SQL][FOLLOWUP] Refine the catalog v2 API to purge a table
This is a followup of https://github.com/apache/spark/pull/30267
Inspired by https://github.com/apache/spark/pull/30886, it's better to have 2 methods `def dropTable` and `def purgeTable`, than `def dropTable(ident)` and `def dropTable(ident, purge)`.
1. make the APIs orthogonal. Previously, `def dropTable(ident, purge)` calls `def dropTable(ident)` and is a superset.
2. simplifies the catalog implementation a little bit. Now the `if (purge) ... else ...` check is done at the Spark side.
No.
existing tests
Closes #30890 from cloud-fan/purgeTable.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: HyukjinKwon <gu...@apache.org>
(cherry picked from commit ec1560af251d2c3580f5bccfabc750f1c7af09df)
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
.../catalog/DelegatingCatalogExtension.java | 5 +++
.../spark/sql/connector/catalog/TableCatalog.java | 17 +++----
.../sql/connector/catalog/TableCatalogSuite.scala | 5 +++
.../execution/datasources/v2/DropTableExec.scala | 2 +-
.../sql/execution/command/v2/DropTableSuite.scala | 52 ++++++++++++++++++++++
5 files changed, 70 insertions(+), 11 deletions(-)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
index d07d299..34f07b1 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
@@ -100,6 +100,11 @@ public abstract class DelegatingCatalogExtension implements CatalogExtension {
}
@Override
+ public boolean purgeTable(Identifier ident) {
+ return asTableCatalog().purgeTable(ident);
+ }
+
+ @Override
public void renameTable(
Identifier oldIdent,
Identifier newIdent) throws NoSuchTableException, TableAlreadyExistsException {
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
index 52a74ab..4163d86 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java
@@ -173,26 +173,23 @@ public interface TableCatalog extends CatalogPlugin {
boolean dropTable(Identifier ident);
/**
- * Drop a table in the catalog with an option to purge.
+ * Drop a table in the catalog and completely remove its data by skipping a trash even if it is
+ * supported.
* <p>
* If the catalog supports views and contains a view for the identifier and not a table, this
* must not drop the view and must return false.
* <p>
- * If the catalog supports the option to purge a table, this method must be overridden.
- * The default implementation falls back to {@link #dropTable(Identifier)} dropTable} if the
- * purge option is set to false. Otherwise, it throws {@link UnsupportedOperationException}.
+ * If the catalog supports to purge a table, this method should be overridden.
+ * The default implementation throws {@link UnsupportedOperationException}.
*
* @param ident a table identifier
- * @param purge whether a table should be purged
* @return true if a table was deleted, false if no table exists for the identifier
+ * @throws UnsupportedOperationException If table purging is not supported
*
* @since 3.1.0
*/
- default boolean dropTable(Identifier ident, boolean purge) {
- if (purge) {
- throw new UnsupportedOperationException("Purge option is not supported.");
- }
- return dropTable(ident);
+ default boolean purgeTable(Identifier ident) throws UnsupportedOperationException {
+ throw new UnsupportedOperationException("Purge table is not supported.");
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
index dab2091..ef342e7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
@@ -643,6 +643,11 @@ class TableCatalogSuite extends SparkFunSuite {
assert(!catalog.tableExists(testIdent))
}
+ test("purgeTable") {
+ val catalog = newCatalog()
+ intercept[UnsupportedOperationException](catalog.purgeTable(testIdent))
+ }
+
test("renameTable") {
val catalog = newCatalog()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
index f89b890..100eaf9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropTableExec.scala
@@ -35,7 +35,7 @@ case class DropTableExec(
override def run(): Seq[InternalRow] = {
if (catalog.tableExists(ident)) {
invalidateCache()
- catalog.dropTable(ident, purge)
+ if (purge) catalog.purgeTable(ident) else catalog.dropTable(ident)
} else if (!ifExists) {
throw new NoSuchTableException(ident)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropTableSuite.scala
new file mode 100644
index 0000000..a272f64
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DropTableSuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.v2
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.connector.InMemoryTableSessionCatalog
+import org.apache.spark.sql.execution.command
+import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
+
+class DropTableSuite extends command.DropTableSuiteBase with CommandSuiteBase {
+ test("purge option") {
+ withNamespaceAndTable("ns", "tbl") { t =>
+ createTable(t)
+ val errMsg = intercept[UnsupportedOperationException] {
+ sql(s"DROP TABLE $catalog.ns.tbl PURGE")
+ }.getMessage
+ // The default TableCatalog.purgeTable implementation throws an exception.
+ assert(errMsg.contains("Purge table is not supported"))
+ }
+ }
+
+ test("table qualified with the session catalog name") {
+ withSQLConf(
+ V2_SESSION_CATALOG_IMPLEMENTATION.key -> classOf[InMemoryTableSessionCatalog].getName) {
+
+ sql("CREATE TABLE tbl USING json AS SELECT 1 AS i")
+ checkAnswer(
+ sql("SHOW TABLES IN spark_catalog.default").select("tableName"),
+ Row("tbl"))
+
+ sql("DROP TABLE spark_catalog.default.tbl")
+ checkAnswer(
+ sql("SHOW TABLES IN spark_catalog.default").select("tableName"),
+ Seq.empty)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org