You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/10/11 05:38:33 UTC

[spark] branch master updated: [SPARK-36849][SQL] Migrate UseStatement to v2 command framework

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 73edf31  [SPARK-36849][SQL] Migrate UseStatement to v2 command framework
73edf31 is described below

commit 73edf31c63608c6416594a4c8f4a087f10dcd7a2
Author: dohongdayi <do...@126.com>
AuthorDate: Mon Oct 11 13:37:55 2021 +0800

    [SPARK-36849][SQL] Migrate UseStatement to v2 command framework
    
    What changes were proposed in this pull request?
    Migrate `UseStatement` to v2 command framework, add `SetNamespaceCommand`
    
    Why are the changes needed?
    Migrate to the standard V2 framework
    
    Does this PR introduce any user-facing change?
    no
    
    How was this patch tested?
    existing tests
    
    Closes #34127 from dohongdayi/use_branch.
    
    Lead-authored-by: dohongdayi <do...@126.com>
    Co-authored-by: Herbert Liao <he...@synnex.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/catalyst/parser/SqlBase.g4    |  3 +-
 .../sql/catalyst/analysis/ResolveCatalogs.scala    |  9 ------
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  4 +--
 .../sql/catalyst/plans/logical/statements.scala    |  5 ----
 .../sql/catalyst/plans/logical/v2Commands.scala    | 11 ++++----
 .../spark/sql/execution/SparkSqlParser.scala       |  8 ++++++
 .../execution/command/SetNamespaceCommand.scala    | 33 ++++++++++++++++++++++
 .../datasources/v2/DataSourceV2Strategy.scala      |  6 ++--
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  6 ++++
 9 files changed, 61 insertions(+), 24 deletions(-)

diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 886810e..3cd39a9 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -106,7 +106,8 @@ singleTableSchema
 statement
     : query                                                            #statementDefault
     | ctes? dmlStatementNoWith                                         #dmlStatement
-    | USE NAMESPACE? multipartIdentifier                               #use
+    | USE multipartIdentifier                                          #use
+    | USE NAMESPACE multipartIdentifier                                #useNamespace
     | SET CATALOG (identifier | STRING)                                #setCatalog
     | CREATE namespace (IF NOT EXISTS)? multipartIdentifier
         (commentSpec |
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index e9204ad..efc1ab2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -82,15 +82,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
         convertTableProperties(c),
         writeOptions = c.writeOptions,
         orCreate = c.orCreate)
-
-    case UseStatement(isNamespaceSet, nameParts) =>
-      if (isNamespaceSet) {
-        SetCatalogAndNamespace(catalogManager, None, Some(nameParts))
-      } else {
-        val CatalogAndNamespace(catalog, ns) = nameParts
-        val namespace = if (ns.nonEmpty) Some(ns) else None
-        SetCatalogAndNamespace(catalogManager, Some(catalog.name()), namespace)
-      }
   }
 
   object NonSessionCatalogAndTable {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 6a24a9d..1968142 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3565,11 +3565,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
   }
 
   /**
-   * Create a [[UseStatement]] logical plan.
+   * Create a [[SetCatalogAndNamespace]] command.
    */
   override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) {
     val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier)
-    UseStatement(ctx.NAMESPACE != null, nameParts)
+    SetCatalogAndNamespace(UnresolvedDBObjectName(nameParts, isNamespace = true))
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 0373c25..c502981 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -272,8 +272,3 @@ case class InsertIntoStatement(
   override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoStatement =
     copy(query = newChild)
 }
-
-/**
- * A USE statement, as parsed from SQL.
- */
-case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends LeafParsedStatement
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index d548aec..e52dc02 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -622,12 +622,13 @@ object ShowViews {
 }
 
 /**
- * The logical plan of the USE/USE NAMESPACE command.
+ * The logical plan of the USE command.
  */
-case class SetCatalogAndNamespace(
-    catalogManager: CatalogManager,
-    catalogName: Option[String],
-    namespace: Option[Seq[String]]) extends LeafCommand
+case class SetCatalogAndNamespace(child: LogicalPlan) extends UnaryCommand {
+  override protected def withNewChildInternal(newChild: LogicalPlan): SetCatalogAndNamespace = {
+    copy(child = newChild)
+  }
+}
 
 /**
  * The logical plan of the REFRESH TABLE command.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 21aed5f..78d5354 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -240,6 +240,14 @@ class SparkSqlAstBuilder extends AstBuilder {
   }
 
   /**
+   * Create a [[SetNamespaceCommand]] logical command.
+   */
+  override def visitUseNamespace(ctx: UseNamespaceContext): LogicalPlan = withOrigin(ctx) {
+    val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier)
+    SetNamespaceCommand(nameParts)
+  }
+
+  /**
    * Create a [[SetCatalogCommand]] logical command.
    */
   override def visitSetCatalog(ctx: SetCatalogContext): LogicalPlan = withOrigin(ctx) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCommand.scala
new file mode 100644
index 0000000..cef18f7
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCommand.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+/**
+ * The command for `USE NAMESPACE XXX`
+ */
+case class SetNamespaceCommand(namespace: Seq[String]) extends LeafRunnableCommand {
+  override def output: Seq[Attribute] = Seq.empty
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    sparkSession.sessionState.catalogManager.setCurrentNamespace(namespace.toArray)
+    Seq.empty
+  }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index db61d61..56e7abc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -332,8 +332,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
     case ShowTables(ResolvedNamespace(catalog, ns), pattern, output) =>
       ShowTablesExec(output, catalog.asTableCatalog, ns, pattern) :: Nil
 
-    case SetCatalogAndNamespace(catalogManager, catalogName, ns) =>
-      SetCatalogAndNamespaceExec(catalogManager, catalogName, ns) :: Nil
+    case SetCatalogAndNamespace(ResolvedDBObjectName(catalog, ns)) =>
+      val catalogManager = session.sessionState.catalogManager
+      val namespace = if (ns.nonEmpty) Some(ns) else None
+      SetCatalogAndNamespaceExec(catalogManager, Some(catalog.name()), namespace) :: Nil
 
     case ShowTableProperties(rt: ResolvedTable, propertyKey, output) =>
       ShowTablePropertiesExec(output, rt.table, propertyKey) :: Nil
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index a2e147c..70bd8ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1337,6 +1337,7 @@ class DataSourceV2SQLSuite
     sql("CREATE TABLE testcat2.ns2.ns2_2.table (id bigint) USING foo")
     sql("CREATE TABLE testcat2.ns3.ns3_3.table (id bigint) USING foo")
     sql("CREATE TABLE testcat2.testcat.table (id bigint) USING foo")
+    sql("CREATE TABLE testcat2.testcat.ns1.ns1_1.table (id bigint) USING foo")
 
     // Catalog is resolved to 'testcat'.
     sql("USE testcat.ns1.ns1_1")
@@ -1358,6 +1359,11 @@ class DataSourceV2SQLSuite
     assert(catalogManager.currentCatalog.name() == "testcat2")
     assert(catalogManager.currentNamespace === Array("testcat"))
 
+    // Only the namespace is changed (explicit).
+    sql("USE NAMESPACE testcat.ns1.ns1_1")
+    assert(catalogManager.currentCatalog.name() == "testcat2")
+    assert(catalogManager.currentNamespace === Array("testcat", "ns1", "ns1_1"))
+
     // Catalog is resolved to `testcat`.
     sql("USE testcat")
     assert(catalogManager.currentCatalog.name() == "testcat")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org