You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/26 12:36:45 UTC

[spark] branch master updated: [SPARK-36587][SQL] Migrate CreateNamespaceStatement to v2 command framework

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 72d6d64  [SPARK-36587][SQL] Migrate CreateNamespaceStatement to v2 command framework
72d6d64 is described below

commit 72d6d648358883669c648ce4a5c88aed98723829
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Aug 26 20:36:04 2021 +0800

    [SPARK-36587][SQL] Migrate CreateNamespaceStatement to v2 command framework
    
    ### What changes were proposed in this pull request?
    
    This PR migrates CreateNamespaceStatement to the v2 command framework. Two new logical plans `UnresolvedObjectName` and `ResolvedObjectName` are introduced to support these CreateXXXStatements.
    
    ### Why are the changes needed?
    
    Avoid duplicated code
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #33835 from cloud-fan/ddl.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/ResolveCatalogs.scala   |  7 +++----
 .../spark/sql/catalyst/analysis/v2ResolutionPlans.scala | 17 +++++++++++++++++
 .../apache/spark/sql/catalyst/parser/AstBuilder.scala   |  8 +++++---
 .../spark/sql/catalyst/plans/logical/v2Commands.scala   |  9 ++++++---
 .../spark/sql/catalyst/parser/DDLParserSuite.scala      | 10 +++++-----
 .../sql/catalyst/analysis/ResolveSessionCatalog.scala   |  8 ++++----
 .../execution/datasources/v2/DataSourceV2Strategy.scala |  6 +++---
 7 files changed, 43 insertions(+), 22 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 1365cf6..64f37b3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -31,6 +31,9 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
   import org.apache.spark.sql.connector.catalog.CatalogV2Util._
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+    case UnresolvedDBObjectName(CatalogAndNamespace(catalog, name), isNamespace) if isNamespace =>
+      ResolvedDBObjectName(catalog, name)
+
     case c @ CreateTableStatement(
          NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _, _, _, _, _, _, _) =>
       CreateV2Table(
@@ -77,10 +80,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
         writeOptions = c.writeOptions,
         orCreate = c.orCreate)
 
-    case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _)
-        if !isSessionCatalog(catalog) =>
-      CreateNamespace(catalog.asNamespaceCatalog, ns, c.ifNotExists, c.properties)
-
     case UseStatement(isNamespaceSet, nameParts) =>
       if (isNamespaceSet) {
         SetCatalogAndNamespace(catalogManager, None, Some(nameParts))
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index 625e3ff..9ddaebf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -125,6 +125,16 @@ case class UnresolvedFunc(multipartIdentifier: Seq[String]) extends LeafNode {
 }
 
 /**
+ * Holds the name of a database object (table, view, namespace, function, etc.) that is to be
+ * created and we need to determine the catalog to store it. It will be resolved to
+ * [[ResolvedDBObjectName]] during analysis.
+ */
+case class UnresolvedDBObjectName(nameParts: Seq[String], isNamespace: Boolean) extends LeafNode {
+  override lazy val resolved: Boolean = false
+  override def output: Seq[Attribute] = Nil
+}
+
+/**
  * A plan containing resolved namespace.
  */
 case class ResolvedNamespace(catalog: CatalogPlugin, namespace: Seq[String])
@@ -188,3 +198,10 @@ case class ResolvedFunc(identifier: Identifier)
   extends LeafNode {
   override def output: Seq[Attribute] = Nil
 }
+
+/**
+ * A plan containing resolved database object name with catalog determined.
+ */
+case class ResolvedDBObjectName(catalog: CatalogPlugin, nameParts: Seq[String]) extends LeafNode {
+  override def output: Seq[Attribute] = Nil
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index fcbc6d2..10bb864 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2999,7 +2999,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
   }
 
   /**
-   * Create a [[CreateNamespaceStatement]] command.
+   * Create a [[CreateNamespace]] command.
    *
    * For example:
    * {{{
@@ -3037,8 +3037,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
       properties += PROP_LOCATION -> _
     }
 
-    CreateNamespaceStatement(
-      visitMultipartIdentifier(ctx.multipartIdentifier),
+    CreateNamespace(
+      UnresolvedDBObjectName(
+        visitMultipartIdentifier(ctx.multipartIdentifier),
+        isNamespace = true),
       ctx.EXISTS != null,
       properties)
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index 195bb8c..07010f7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -291,10 +291,13 @@ case class ReplaceTableAsSelect(
  * The logical plan of the CREATE NAMESPACE command.
  */
 case class CreateNamespace(
-    catalog: SupportsNamespaces,
-    namespace: Seq[String],
+    name: LogicalPlan,
     ifNotExists: Boolean,
-    properties: Map[String, String]) extends LeafCommand
+    properties: Map[String, String]) extends UnaryCommand {
+  override def child: LogicalPlan = name
+  override protected def withNewChildInternal(newChild: LogicalPlan): CreateNamespace =
+    copy(name = newChild)
+}
 
 /**
  * The logical plan of the DROP NAMESPACE command.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index a1d9f89..b3765fa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.parser
 import java.util.Locale
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, PersistedView, UnresolvedAttribute, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedFunc, UnresolvedInlineTable, UnresolvedNamespace, UnresolvedRelation, UnresolvedStar, UnresolvedTable, UnresolvedTableOrView, UnresolvedView}
+import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, BucketSpec, FileResource, FunctionResource, JarResource}
 import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -1696,8 +1696,8 @@ class DDLParserSuite extends AnalysisTest {
   }
 
   test("create namespace -- backward compatibility with DATABASE/DBPROPERTIES") {
-    val expected = CreateNamespaceStatement(
-      Seq("a", "b", "c"),
+    val expected = CreateNamespace(
+      UnresolvedDBObjectName(Seq("a", "b", "c"), true),
       ifNotExists = true,
       Map(
         "a" -> "a",
@@ -1769,8 +1769,8 @@ class DDLParserSuite extends AnalysisTest {
       """.stripMargin
     comparePlans(
       parsePlan(sql),
-      CreateNamespaceStatement(
-        Seq("a", "b", "c"),
+      CreateNamespace(
+        UnresolvedDBObjectName(Seq("a", "b", "c"), true),
         ifNotExists = false,
         Map(
           "a" -> "1",
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 80063cd..6f124f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -255,16 +255,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
     case DropView(r: ResolvedView, ifExists) =>
       DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = true, purge = false)
 
-    case c @ CreateNamespaceStatement(CatalogAndNamespace(catalog, ns), _, _)
+    case c @ CreateNamespace(ResolvedDBObjectName(catalog, name), _, _)
         if isSessionCatalog(catalog) =>
-      if (ns.length != 1) {
-        throw QueryCompilationErrors.invalidDatabaseNameError(ns.quoted)
+      if (name.length != 1) {
+        throw QueryCompilationErrors.invalidDatabaseNameError(name.quoted)
       }
 
       val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT)
       val location = c.properties.get(SupportsNamespaces.PROP_LOCATION)
       val newProperties = c.properties -- CatalogV2Util.NAMESPACE_RESERVED_PROPERTIES
-      CreateDatabaseCommand(ns.head, c.ifNotExists, location, comment, newProperties)
+      CreateDatabaseCommand(name.head, c.ifNotExists, location, comment, newProperties)
 
     case d @ DropNamespace(DatabaseInSessionCatalog(db), _, _) =>
       DropDatabaseCommand(db, d.ifExists, d.cascade)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 1a50c32..ae8a092 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedDBObjectName, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -314,8 +314,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
         ns,
         Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil
 
-    case CreateNamespace(catalog, namespace, ifNotExists, properties) =>
-      CreateNamespaceExec(catalog, namespace, ifNotExists, properties) :: Nil
+    case CreateNamespace(ResolvedDBObjectName(catalog, name), ifNotExists, properties) =>
+      CreateNamespaceExec(catalog.asNamespaceCatalog, name, ifNotExists, properties) :: Nil
 
     case DropNamespace(ResolvedNamespace(catalog, ns), ifExists, cascade) =>
       DropNamespaceExec(catalog, ns, ifExists, cascade) :: Nil

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org