You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/26 12:36:45 UTC
[spark] branch master updated: [SPARK-36587][SQL] Migrate
CreateNamespaceStatement to v2 command framework
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 72d6d64 [SPARK-36587][SQL] Migrate CreateNamespaceStatement to v2 command framework
72d6d64 is described below
commit 72d6d648358883669c648ce4a5c88aed98723829
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Aug 26 20:36:04 2021 +0800
[SPARK-36587][SQL] Migrate CreateNamespaceStatement to v2 command framework
### What changes were proposed in this pull request?
This PR migrates CreateNamespaceStatement to the v2 command framework. Two new logical plans `UnresolvedObjectName` and `ResolvedObjectName` are introduced to support these CreateXXXStatements.
### Why are the changes needed?
Avoid duplicated code
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes #33835 from cloud-fan/ddl.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/analysis/ResolveCatalogs.scala | 7 +++----
.../spark/sql/catalyst/analysis/v2ResolutionPlans.scala | 17 +++++++++++++++++
.../apache/spark/sql/catalyst/parser/AstBuilder.scala | 8 +++++---
.../spark/sql/catalyst/plans/logical/v2Commands.scala | 9 ++++++---
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 10 +++++-----
.../sql/catalyst/analysis/ResolveSessionCatalog.scala | 8 ++++----
.../execution/datasources/v2/DataSourceV2Strategy.scala | 6 +++---
7 files changed, 43 insertions(+), 22 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 1365cf6..64f37b3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -31,6 +31,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
import org.apache.spark.sql.connector.catalog.CatalogV2Util._
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case UnresolvedDBObjectName(CatalogAndNamespace(catalog, name), isNamespace) if isNamespace =>
+ ResolvedDBObjectName(catalog, name)
+
case c @ CreateTableStatement(
NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
CreateV2Table(
@@ -77,10 +80,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
writeOptions = c.writeOptions,
orCreate = c.orCreate)
- case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _)
- if !isSessionCatalog(catalog) =>
- CreateNamespace(catalog.asNamespaceCatalog, ns, c.ifNotExists, c.properties)
-
case UseStatement(isNamespaceSet, nameParts) =>
if (isNamespaceSet) {
SetCatalogAndNamespace(catalogManager, None, Some(nameParts))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index 625e3ff..9ddaebf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -125,6 +125,16 @@ case class UnresolvedFunc(multipartIdentifier: Seq[String]) extends LeafNode {
}
/**
+ * Holds the name of a database object (table, view, namespace, function, etc.) that is to be
+ * created and we need to determine the catalog to store it. It will be resolved to
+ * [[ResolvedDBObjectName]] during analysis.
+ */
+case class UnresolvedDBObjectName(nameParts: Seq[String], isNamespace: Boolean) extends LeafNode {
+ override lazy val resolved: Boolean = false
+ override def output: Seq[Attribute] = Nil
+}
+
+/**
* A plan containing resolved namespace.
*/
case class ResolvedNamespace(catalog: CatalogPlugin, namespace: Seq[String])
@@ -188,3 +198,10 @@ case class ResolvedFunc(identifier: Identifier)
extends LeafNode {
override def output: Seq[Attribute] = Nil
}
+
+/**
+ * A plan containing resolved database object name with catalog determined.
+ */
+case class ResolvedDBObjectName(catalog: CatalogPlugin, nameParts: Seq[String]) extends LeafNode {
+ override def output: Seq[Attribute] = Nil
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index fcbc6d2..10bb864 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2999,7 +2999,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
/**
- * Create a [[CreateNamespaceStatement]] command.
+ * Create a [[CreateNamespace]] command.
*
* For example:
* {{{
@@ -3037,8 +3037,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
properties += PROP_LOCATION -> _
}
- CreateNamespaceStatement(
- visitMultipartIdentifier(ctx.multipartIdentifier),
+ CreateNamespace(
+ UnresolvedDBObjectName(
+ visitMultipartIdentifier(ctx.multipartIdentifier),
+ isNamespace = true),
ctx.EXISTS != null,
properties)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 195bb8c..07010f7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -291,10 +291,13 @@ case class ReplaceTableAsSelect(
* The logical plan of the CREATE NAMESPACE command.
*/
case class CreateNamespace(
- catalog: SupportsNamespaces,
- namespace: Seq[String],
+ name: LogicalPlan,
ifNotExists: Boolean,
- properties: Map[String, String]) extends LeafCommand
+ properties: Map[String, String]) extends UnaryCommand {
+ override def child: LogicalPlan = name
+ override protected def withNewChildInternal(newChild: LogicalPlan): CreateNamespace =
+ copy(name = newChild)
+}
/**
* The logical plan of the DROP NAMESPACE command.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index a1d9f89..b3765fa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
import java.util.Locale
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
+import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
@@ -1696,8 +1696,8 @@ class DDLParserSuite extends AnalysisTest {
}
test("create namespace -- backward compatibility with DATABASE/DBPROPERTIES") {
- val expected = CreateNamespaceStatement(
- Seq("a", "b", "c"),
+ val expected = CreateNamespace(
+ UnresolvedDBObjectName(Seq("a", "b", "c"), true),
ifNotExists = true,
Map(
"a" -> "a",
@@ -1769,8 +1769,8 @@ class DDLParserSuite extends AnalysisTest {
""".stripMargin
comparePlans(
parsePlan(sql),
- CreateNamespaceStatement(
- Seq("a", "b", "c"),
+ CreateNamespace(
+ UnresolvedDBObjectName(Seq("a", "b", "c"), true),
ifNotExists = false,
Map(
"a" -> "1",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 80063cd..6f124f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -255,16 +255,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case DropView(r: ResolvedView, ifExists) =>
DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = true, purge = false)
- case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _)
+ case c @ CreateNamespace(ResolvedDBObjectName(catalog, name), _, _)
if isSessionCatalog(catalog) =>
- if (ns.length != 1) {
- throw QueryCompilationErrors.invalidDatabaseNameError(ns.quoted)
+ if (name.length != 1) {
+ throw QueryCompilationErrors.invalidDatabaseNameError(name.quoted)
}
val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT)
val location = c.properties.get(SupportsNamespaces.PROP_LOCATION)
val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES
- CreateDatabaseCommand(ns.head, c.ifNotExists, location, comment, newProperties)
+ CreateDatabaseCommand(name.head, c.ifNotExists, location, comment, newProperties)
case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) =>
DropDatabaseCommand(db, d.ifExists, d.cascade)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 1a50c32..ae8a092 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
import scala.collection.JavaConverters._
import org.apache.spark.sql.{SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedDBObjectName, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
@@ -314,8 +314,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
ns,
Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil
- case CreateNamespace(catalog, namespace, ifNotExists, properties) =>
- CreateNamespaceExec(catalog, namespace, ifNotExists, properties) :: Nil
+ case CreateNamespace(ResolvedDBObjectName(catalog, name), ifNotExists, properties) =>
+ CreateNamespaceExec(catalog.asNamespaceCatalog, name, ifNotExists, properties) :: Nil
case DropNamespace(ResolvedNamespace(catalog, ns), ifExists, cascade) =>
DropNamespaceExec(catalog, ns, ifExists, cascade) :: Nil
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org