You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/01/21 07:23:51 UTC
[spark] branch master updated: [SPARK-37929][SQL] Support cascade mode for `dropNamespace` API
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new ec5d2a7 [SPARK-37929][SQL] Support cascade mode for `dropNamespace` API
ec5d2a7 is described below
commit ec5d2a76a55a4094d7bb48788a917145d81d47cf
Author: dch nguyen <dc...@gmail.com>
AuthorDate: Fri Jan 21 15:23:05 2022 +0800
[SPARK-37929][SQL] Support cascade mode for `dropNamespace` API
### What changes were proposed in this pull request?
This PR adds a new API `dropNamespace(String[] ns, boolean cascade)` to replace the existing one: Add a boolean parameter `cascade` that supports deleting all the Namespaces and Tables under the namespace.
Also include changing the implementations and tests that are relevant to this API.
### Why are the changes needed?
According to [#cmt](https://github.com/apache/spark/pull/35202#discussion_r784463563), the current `dropNamespace` API doesn't support cascade mode. So this PR replaces that to support cascading.
If cascade is set True, delete all namespaces and tables under the namespace.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing test.
Closes #35246 from dchvn/change_dropnamespace_api.
Authored-by: dch nguyen <dc...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala | 2 +-
.../catalog/DelegatingCatalogExtension.java | 6 ++--
.../sql/connector/catalog/SupportsNamespaces.java | 10 ++++--
.../sql/catalyst/analysis/NonEmptyException.scala | 36 +++++++++++++++++++
.../spark/sql/connector/catalog/CatalogSuite.scala | 6 ++--
.../connector/catalog/InMemoryTableCatalog.scala | 14 +++++---
.../datasources/v2/DropNamespaceExec.scala | 15 +++-----
.../datasources/v2/V2SessionCatalog.scala | 9 +++--
.../datasources/v2/jdbc/JDBCTableCatalog.scala | 4 ++-
.../datasources/v2/V2SessionCatalogSuite.scala | 42 +++++++++++-----------
10 files changed, 95 insertions(+), 49 deletions(-)
diff --git a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala
index 284b05c..0c6b270 100644
--- a/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala
+++ b/external/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/v2/V2JDBCNamespaceTest.scala
@@ -52,7 +52,7 @@ private[v2] trait V2JDBCNamespaceTest extends SharedSparkSession with DockerInte
.exists(_.contains("catalog comment"))
assert(createCommentWarning === false)
- catalog.dropNamespace(Array("foo"))
+ catalog.dropNamespace(Array("foo"), cascade = false)
assert(catalog.namespaceExists(Array("foo")) === false)
assert(catalog.listNamespaces() === builtinNamespaces)
val msg = intercept[AnalysisException] {
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
index 48a859a..865ac55 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/DelegatingCatalogExtension.java
@@ -155,8 +155,10 @@ public abstract class DelegatingCatalogExtension implements CatalogExtension {
}
@Override
- public boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException {
- return asNamespaceCatalog().dropNamespace(namespace);
+ public boolean dropNamespace(
+ String[] namespace,
+ boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException {
+ return asNamespaceCatalog().dropNamespace(namespace, cascade);
}
@Override
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java
index f70746b..c1a4960 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsNamespaces.java
@@ -20,6 +20,7 @@ package org.apache.spark.sql.connector.catalog;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException;
import java.util.Map;
@@ -136,15 +137,20 @@ public interface SupportsNamespaces extends CatalogPlugin {
NamespaceChange... changes) throws NoSuchNamespaceException;
/**
- * Drop a namespace from the catalog, recursively dropping all objects within the namespace.
+ * Drop a namespace from the catalog with cascade mode, recursively dropping all objects
+ * within the namespace if cascade is true.
* <p>
* If the catalog implementation does not support this operation, it may throw
* {@link UnsupportedOperationException}.
*
* @param namespace a multi-part namespace
+ * @param cascade When true, deletes all objects under the namespace
* @return true if the namespace was dropped
* @throws NoSuchNamespaceException If the namespace does not exist (optional)
+ * @throws NonEmptyNamespaceException If the namespace is non-empty and cascade is false
* @throws UnsupportedOperationException If drop is not a supported operation
*/
- boolean dropNamespace(String[] namespace) throws NoSuchNamespaceException;
+ boolean dropNamespace(
+ String[] namespace,
+ boolean cascade) throws NoSuchNamespaceException, NonEmptyNamespaceException;
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
new file mode 100644
index 0000000..f3ff28f
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/NonEmptyException.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+
+
+/**
+ * Thrown by a catalog when an item already exists. The analyzer will rethrow the exception
+ * as an [[org.apache.spark.sql.AnalysisException]] with the correct position information.
+ */
+case class NonEmptyNamespaceException(
+ override val message: String,
+ override val cause: Option[Throwable] = None)
+ extends AnalysisException(message, cause = cause) {
+
+ def this(namespace: Array[String]) = {
+ this(s"Namespace '${namespace.quoted}' is non empty.")
+ }
+}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
index 0cca1cc..d00bc31 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala
@@ -820,7 +820,7 @@ class CatalogSuite extends SparkFunSuite {
assert(catalog.namespaceExists(testNs) === false)
- val ret = catalog.dropNamespace(testNs)
+ val ret = catalog.dropNamespace(testNs, cascade = false)
assert(ret === false)
}
@@ -833,7 +833,7 @@ class CatalogSuite extends SparkFunSuite {
assert(catalog.namespaceExists(testNs) === true)
assert(catalog.loadNamespaceMetadata(testNs).asScala === Map("property" -> "value"))
- val ret = catalog.dropNamespace(testNs)
+ val ret = catalog.dropNamespace(testNs, cascade = false)
assert(ret === true)
assert(catalog.namespaceExists(testNs) === false)
@@ -845,7 +845,7 @@ class CatalogSuite extends SparkFunSuite {
catalog.createNamespace(testNs, Map("property" -> "value").asJava)
catalog.createTable(testIdent, schema, Array.empty, emptyProps)
- assert(catalog.dropNamespace(testNs))
+ assert(catalog.dropNamespace(testNs, cascade = true))
assert(!catalog.namespaceExists(testNs))
intercept[NoSuchNamespaceException](catalog.listTables(testNs))
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index d8e6bc4..428aec7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
-import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
+import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NonEmptyNamespaceException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException}
import org.apache.spark.sql.connector.distributions.{Distribution, Distributions}
import org.apache.spark.sql.connector.expressions.{SortOrder, Transform}
import org.apache.spark.sql.types.StructType
@@ -213,10 +213,16 @@ class InMemoryTableCatalog extends BasicInMemoryTableCatalog with SupportsNamesp
namespaces.put(namespace.toList, CatalogV2Util.applyNamespaceChanges(metadata, changes))
}
- override def dropNamespace(namespace: Array[String]): Boolean = {
- listNamespaces(namespace).foreach(dropNamespace)
+ override def dropNamespace(namespace: Array[String], cascade: Boolean): Boolean = {
try {
- listTables(namespace).foreach(dropTable)
+ if (!cascade) {
+ if (listTables(namespace).nonEmpty || listNamespaces(namespace).nonEmpty) {
+ throw new NonEmptyNamespaceException(namespace)
+ }
+ } else {
+ listNamespaces(namespace).foreach(namespace => dropNamespace(namespace, cascade))
+ listTables(namespace).foreach(dropTable)
+ }
} catch {
case _: NoSuchNamespaceException =>
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala
index 9a9d8e1..5d30205 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DropNamespaceExec.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.NonEmptyNamespaceException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -37,17 +38,11 @@ case class DropNamespaceExec(
val nsCatalog = catalog.asNamespaceCatalog
val ns = namespace.toArray
if (nsCatalog.namespaceExists(ns)) {
- // The default behavior of `SupportsNamespace.dropNamespace()` is cascading,
- // so make sure the namespace to drop is empty.
- if (!cascade) {
- if (catalog.asTableCatalog.listTables(ns).nonEmpty
- || nsCatalog.listNamespaces(ns).nonEmpty) {
+ try {
+ nsCatalog.dropNamespace(ns, cascade)
+ } catch {
+ case _: NonEmptyNamespaceException =>
throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace)
- }
- }
-
- if (!nsCatalog.dropNamespace(ns)) {
- throw QueryCompilationErrors.cannotDropNonemptyNamespaceError(namespace)
}
} else if (!ifExists) {
throw QueryCompilationErrors.noSuchNamespaceError(ns)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 3ea7d0f..d9cfe0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -286,12 +286,11 @@ class V2SessionCatalog(catalog: SessionCatalog)
}
}
- override def dropNamespace(namespace: Array[String]): Boolean = namespace match {
+ override def dropNamespace(
+ namespace: Array[String],
+ cascade: Boolean): Boolean = namespace match {
case Array(db) if catalog.databaseExists(db) =>
- if (catalog.listTables(db).nonEmpty) {
- throw QueryExecutionErrors.namespaceNotEmptyError(namespace)
- }
- catalog.dropDatabase(db, ignoreIfNotExists = false, cascade = false)
+ catalog.dropDatabase(db, ignoreIfNotExists = false, cascade)
true
case Array(_) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
index 5667064..1658f0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
@@ -278,7 +278,9 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging
}
}
- override def dropNamespace(namespace: Array[String]): Boolean = namespace match {
+ override def dropNamespace(
+ namespace: Array[String],
+ cascade: Boolean): Boolean = namespace match {
case Array(db) if namespaceExists(namespace) =>
if (listTables(Array(db)).nonEmpty) {
throw QueryExecutionErrors.namespaceNotEmptyError(namespace)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index 86f4dc4..646eccb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -67,10 +67,10 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
override protected def afterAll(): Unit = {
val catalog = newCatalog()
- catalog.dropNamespace(Array("db"))
- catalog.dropNamespace(Array("db2"))
- catalog.dropNamespace(Array("ns"))
- catalog.dropNamespace(Array("ns2"))
+ catalog.dropNamespace(Array("db"), cascade = true)
+ catalog.dropNamespace(Array("db2"), cascade = true)
+ catalog.dropNamespace(Array("ns"), cascade = true)
+ catalog.dropNamespace(Array("ns2"), cascade = true)
super.afterAll()
}
@@ -811,7 +811,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(catalog.listNamespaces(Array()) === Array(testNs, defaultNs))
assert(catalog.listNamespaces(testNs) === Array())
- catalog.dropNamespace(testNs)
+ catalog.dropNamespace(testNs, cascade = false)
}
test("listNamespaces: fail if missing namespace") {
@@ -849,7 +849,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(catalog.namespaceExists(testNs) === true)
checkMetadata(metadata.asScala, Map("property" -> "value"))
- catalog.dropNamespace(testNs)
+ catalog.dropNamespace(testNs, cascade = false)
}
test("loadNamespaceMetadata: empty metadata") {
@@ -864,7 +864,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(catalog.namespaceExists(testNs) === true)
checkMetadata(metadata.asScala, emptyProps.asScala)
- catalog.dropNamespace(testNs)
+ catalog.dropNamespace(testNs, cascade = false)
}
test("createNamespace: basic behavior") {
@@ -884,7 +884,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
checkMetadata(metadata, Map("property" -> "value"))
assert(expectedPath === metadata("location"))
- catalog.dropNamespace(testNs)
+ catalog.dropNamespace(testNs, cascade = false)
}
test("createNamespace: initialize location") {
@@ -900,7 +900,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
checkMetadata(metadata, Map.empty)
assert(expectedPath === metadata("location"))
- catalog.dropNamespace(testNs)
+ catalog.dropNamespace(testNs, cascade = false)
}
test("createNamespace: relative location") {
@@ -917,7 +917,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
checkMetadata(metadata, Map.empty)
assert(expectedPath === metadata("location"))
- catalog.dropNamespace(testNs)
+ catalog.dropNamespace(testNs, cascade = false)
}
test("createNamespace: fail if namespace already exists") {
@@ -933,7 +933,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(catalog.namespaceExists(testNs) === true)
checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value"))
- catalog.dropNamespace(testNs)
+ catalog.dropNamespace(testNs, cascade = false)
}
test("createNamespace: fail nested namespace") {
@@ -948,7 +948,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(exc.getMessage.contains("Invalid namespace name: db.nested"))
- catalog.dropNamespace(Array("db"))
+ catalog.dropNamespace(Array("db"), cascade = false)
}
test("createTable: fail if namespace does not exist") {
@@ -969,7 +969,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(catalog.namespaceExists(testNs) === false)
- val ret = catalog.dropNamespace(testNs)
+ val ret = catalog.dropNamespace(testNs, cascade = false)
assert(ret === false)
}
@@ -981,7 +981,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(catalog.namespaceExists(testNs) === true)
- val ret = catalog.dropNamespace(testNs)
+ val ret = catalog.dropNamespace(testNs, cascade = false)
assert(ret === true)
assert(catalog.namespaceExists(testNs) === false)
@@ -993,8 +993,8 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
catalog.createNamespace(testNs, Map("property" -> "value").asJava)
catalog.createTable(testIdent, schema, Array.empty, emptyProps)
- val exc = intercept[IllegalStateException] {
- catalog.dropNamespace(testNs)
+ val exc = intercept[AnalysisException] {
+ catalog.dropNamespace(testNs, cascade = false)
}
assert(exc.getMessage.contains(testNs.quoted))
@@ -1002,7 +1002,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value"))
catalog.dropTable(testIdent)
- catalog.dropNamespace(testNs)
+ catalog.dropNamespace(testNs, cascade = false)
}
test("alterNamespace: basic behavior") {
@@ -1027,7 +1027,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
catalog.loadNamespaceMetadata(testNs).asScala,
Map("property" -> "value"))
- catalog.dropNamespace(testNs)
+ catalog.dropNamespace(testNs, cascade = false)
}
test("alterNamespace: update namespace location") {
@@ -1050,7 +1050,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", "relativeP"))
assert(newRelativePath === spark.catalog.getDatabase(testNs(0)).locationUri)
- catalog.dropNamespace(testNs)
+ catalog.dropNamespace(testNs, cascade = false)
}
test("alterNamespace: update namespace comment") {
@@ -1065,7 +1065,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(newComment === spark.catalog.getDatabase(testNs(0)).description)
- catalog.dropNamespace(testNs)
+ catalog.dropNamespace(testNs, cascade = false)
}
test("alterNamespace: fail if namespace doesn't exist") {
@@ -1092,6 +1092,6 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite {
assert(exc.getMessage.contains(s"Cannot remove reserved property: $p"))
}
- catalog.dropNamespace(testNs)
+ catalog.dropNamespace(testNs, cascade = false)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org