You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/10/11 05:38:33 UTC
[spark] branch master updated: [SPARK-36849][SQL] Migrate
UseStatement to v2 command framework
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 73edf31 [SPARK-36849][SQL] Migrate UseStatement to v2 command framework
73edf31 is described below
commit 73edf31c63608c6416594a4c8f4a087f10dcd7a2
Author: dohongdayi <do...@126.com>
AuthorDate: Mon Oct 11 13:37:55 2021 +0800
[SPARK-36849][SQL] Migrate UseStatement to v2 command framework
What changes were proposed in this pull request?
Migrate `UseStatement` to v2 command framework, add `SetNamespaceCommand`
Why are the changes needed?
Migrate to the standard V2 framework
Does this PR introduce any user-facing change?
no
How was this patch tested?
existing tests
Closes #34127 from dohongdayi/use_branch.
Lead-authored-by: dohongdayi <do...@126.com>
Co-authored-by: Herbert Liao <he...@synnex.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/catalyst/parser/SqlBase.g4 | 3 +-
.../sql/catalyst/analysis/ResolveCatalogs.scala | 9 ------
.../spark/sql/catalyst/parser/AstBuilder.scala | 4 +--
.../sql/catalyst/plans/logical/statements.scala | 5 ----
.../sql/catalyst/plans/logical/v2Commands.scala | 11 ++++----
.../spark/sql/execution/SparkSqlParser.scala | 8 ++++++
.../execution/command/SetNamespaceCommand.scala | 33 ++++++++++++++++++++++
.../datasources/v2/DataSourceV2Strategy.scala | 6 ++--
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 6 ++++
9 files changed, 61 insertions(+), 24 deletions(-)
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 886810e..3cd39a9 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -106,7 +106,8 @@ singleTableSchema
statement
: query #statementDefault
| ctes? dmlStatementNoWith #dmlStatement
- | USE NAMESPACE? multipartIdentifier #use
+ | USE multipartIdentifier #use
+ | USE NAMESPACE multipartIdentifier #useNamespace
| SET CATALOG (identifier | STRING) #setCatalog
| CREATE namespace (IF NOT EXISTS)? multipartIdentifier
(commentSpec |
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index e9204ad..efc1ab2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -82,15 +82,6 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
convertTableProperties(c),
writeOptions = c.writeOptions,
orCreate = c.orCreate)
-
- case UseStatement(isNamespaceSet, nameParts) =>
- if (isNamespaceSet) {
- SetCatalogAndNamespace(catalogManager, None, Some(nameParts))
- } else {
- val CatalogAndNamespace(catalog, ns) = nameParts
- val namespace = if (ns.nonEmpty) Some(ns) else None
- SetCatalogAndNamespace(catalogManager, Some(catalog.name()), namespace)
- }
}
object NonSessionCatalogAndTable {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 6a24a9d..1968142 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3565,11 +3565,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with SQLConfHelper with Logg
}
/**
- * Create a [[UseStatement]] logical plan.
+ * Create a [[SetCatalogAndNamespace]] command.
*/
override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) {
val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier)
- UseStatement(ctx.NAMESPACE != null, nameParts)
+ SetCatalogAndNamespace(UnresolvedDBObjectName(nameParts, isNamespace = true))
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 0373c25..c502981 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -272,8 +272,3 @@ case class InsertIntoStatement(
override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoStatement =
copy(query = newChild)
}
-
-/**
- * A USE statement, as parsed from SQL.
- */
-case class UseStatement(isNamespaceSet: Boolean, nameParts: Seq[String]) extends LeafParsedStatement
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index d548aec..e52dc02 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -622,12 +622,13 @@ object ShowViews {
}
/**
- * The logical plan of the USE/USE NAMESPACE command.
+ * The logical plan of the USE command.
*/
-case class SetCatalogAndNamespace(
- catalogManager: CatalogManager,
- catalogName: Option[String],
- namespace: Option[Seq[String]]) extends LeafCommand
+case class SetCatalogAndNamespace(child: LogicalPlan) extends UnaryCommand {
+ override protected def withNewChildInternal(newChild: LogicalPlan): SetCatalogAndNamespace = {
+ copy(child = newChild)
+ }
+}
/**
* The logical plan of the REFRESH TABLE command.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 21aed5f..78d5354 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -240,6 +240,14 @@ class SparkSqlAstBuilder extends AstBuilder {
}
/**
+ * Create a [[SetNamespaceCommand]] logical command.
+ */
+ override def visitUseNamespace(ctx: UseNamespaceContext): LogicalPlan = withOrigin(ctx) {
+ val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier)
+ SetNamespaceCommand(nameParts)
+ }
+
+ /**
* Create a [[SetCatalogCommand]] logical command.
*/
override def visitSetCatalog(ctx: SetCatalogContext): LogicalPlan = withOrigin(ctx) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCommand.scala
new file mode 100644
index 0000000..cef18f7
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetNamespaceCommand.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+/**
+ * The command for `USE NAMESPACE XXX`
+ */
+case class SetNamespaceCommand(namespace: Seq[String]) extends LeafRunnableCommand {
+ override def output: Seq[Attribute] = Seq.empty
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ sparkSession.sessionState.catalogManager.setCurrentNamespace(namespace.toArray)
+ Seq.empty
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index db61d61..56e7abc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -332,8 +332,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case ShowTables(ResolvedNamespace(catalog, ns), pattern, output) =>
ShowTablesExec(output, catalog.asTableCatalog, ns, pattern) :: Nil
- case SetCatalogAndNamespace(catalogManager, catalogName, ns) =>
- SetCatalogAndNamespaceExec(catalogManager, catalogName, ns) :: Nil
+ case SetCatalogAndNamespace(ResolvedDBObjectName(catalog, ns)) =>
+ val catalogManager = session.sessionState.catalogManager
+ val namespace = if (ns.nonEmpty) Some(ns) else None
+ SetCatalogAndNamespaceExec(catalogManager, Some(catalog.name()), namespace) :: Nil
case ShowTableProperties(rt: ResolvedTable, propertyKey, output) =>
ShowTablePropertiesExec(output, rt.table, propertyKey) :: Nil
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index a2e147c..70bd8ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1337,6 +1337,7 @@ class DataSourceV2SQLSuite
sql("CREATE TABLE testcat2.ns2.ns2_2.table (id bigint) USING foo")
sql("CREATE TABLE testcat2.ns3.ns3_3.table (id bigint) USING foo")
sql("CREATE TABLE testcat2.testcat.table (id bigint) USING foo")
+ sql("CREATE TABLE testcat2.testcat.ns1.ns1_1.table (id bigint) USING foo")
// Catalog is resolved to 'testcat'.
sql("USE testcat.ns1.ns1_1")
@@ -1358,6 +1359,11 @@ class DataSourceV2SQLSuite
assert(catalogManager.currentCatalog.name() == "testcat2")
assert(catalogManager.currentNamespace === Array("testcat"))
+ // Only the namespace is changed (explicit).
+ sql("USE NAMESPACE testcat.ns1.ns1_1")
+ assert(catalogManager.currentCatalog.name() == "testcat2")
+ assert(catalogManager.currentNamespace === Array("testcat", "ns1", "ns1_1"))
+
// Catalog is resolved to `testcat`.
sql("USE testcat")
assert(catalogManager.currentCatalog.name() == "testcat")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org