You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/14 05:30:58 UTC
[spark] branch master updated: [SPARK-39767][SQL] Remove UnresolvedDBObjectName and add UnresolvedIdentifier
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new fc157d8d490 [SPARK-39767][SQL] Remove UnresolvedDBObjectName and add UnresolvedIdentifier
fc157d8d490 is described below
commit fc157d8d4908f473ce32c2696bb9013dcaecd705
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Jul 14 13:30:37 2022 +0800
[SPARK-39767][SQL] Remove UnresolvedDBObjectName and add UnresolvedIdentifier
### What changes were proposed in this pull request?
This PR removes `UnresolvedDBObjectName` and adds a new `UnresolvedIdentifier` to replace it. The same for the resolved plans. The motivation is, `UnresolvedDBObjectName` has a `isNamespace` flag. If it's true, then `UnresolvedDBObjectName` has no difference from `UnresolvedNamespace`. It's a bit weird to have 2 options to do the same thing. If we need to resolve a namespace, we should always use `UnresolvedNamespace`.
### Why are the changes needed?
code simplification
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing tests
Closes #37178 from cloud-fan/refactor.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 21 -----------
.../sql/catalyst/analysis/ResolveCatalogs.scala | 35 +++++++++---------
.../sql/catalyst/analysis/v2ResolutionPlans.scala | 15 ++++----
.../spark/sql/catalyst/parser/AstBuilder.scala | 18 ++++------
.../sql/catalyst/plans/logical/v2Commands.scala | 14 +++-----
.../CreateTablePartitioningValidationSuite.scala | 14 ++++----
.../spark/sql/catalyst/parser/DDLParserSuite.scala | 12 +++----
.../org/apache/spark/sql/DataFrameWriter.scala | 11 +++---
.../org/apache/spark/sql/DataFrameWriterV2.scala | 6 ++--
.../catalyst/analysis/ResolveSessionCatalog.scala | 42 +++++++++++-----------
.../spark/sql/execution/SparkSqlParser.scala | 10 ++----
.../datasources/v2/DataSourceV2Strategy.scala | 30 ++++++++--------
.../apache/spark/sql/internal/CatalogImpl.scala | 4 +--
.../spark/sql/streaming/DataStreamWriter.scala | 6 ++--
.../connector/V2CommandsCaseSensitivitySuite.scala | 10 +++---
.../command/CreateNamespaceParserSuite.scala | 6 ++--
.../sql/execution/command/DDLParserSuite.scala | 22 ++++++------
.../execution/command/PlanResolutionSuite.scala | 26 +++++++-------
18 files changed, 131 insertions(+), 171 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e72ebb29002..7667b4fef71 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -278,7 +278,6 @@ class Analyzer(override val catalogManager: CatalogManager)
KeepLegacyOutputs),
Batch("Resolution", fixedPoint,
ResolveTableValuedFunctions(v1SessionCatalog) ::
- ResolveNamespace(catalogManager) ::
new ResolveCatalogs(catalogManager) ::
ResolveUserSpecifiedColumns ::
ResolveInsertInto ::
@@ -860,26 +859,6 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}
- case class ResolveNamespace(catalogManager: CatalogManager)
- extends Rule[LogicalPlan] with LookupCatalog {
- def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case s @ ShowTables(UnresolvedNamespace(Seq()), _, _) =>
- s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
- case s @ ShowTableExtended(UnresolvedNamespace(Seq()), _, _, _) =>
- s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
- case s @ ShowViews(UnresolvedNamespace(Seq()), _, _) =>
- s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
- case s @ ShowFunctions(UnresolvedNamespace(Seq()), _, _, _, _) =>
- s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
- case a @ AnalyzeTables(UnresolvedNamespace(Seq()), _) =>
- a.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
- case UnresolvedNamespace(Seq()) =>
- ResolvedNamespace(currentCatalog, Seq.empty[String])
- case UnresolvedNamespace(CatalogAndNamespace(catalog, ns)) =>
- ResolvedNamespace(catalog, ns)
- }
- }
-
private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty
private def isReferredTempViewName(nameParts: Seq[String]): Boolean = {
AnalysisContext.get.referredTempViewNames.exists { n =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 8af1d6c6023..9893384b709 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -19,29 +19,30 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
/**
- * Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
- * to the corresponding v2 commands if the resolved catalog is not the session catalog.
+ * Resolves the catalog of the name parts for table/view/function/namespace.
*/
class ResolveCatalogs(val catalogManager: CatalogManager)
extends Rule[LogicalPlan] with LookupCatalog {
- import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
- case UnresolvedDBObjectName(CatalogAndNamespace(catalog, name), isNamespace) if isNamespace =>
- ResolvedDBObjectName(catalog, name)
-
- case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) =>
- ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name())
- }
-
- object NonSessionCatalogAndTable {
- def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
- case NonSessionCatalogAndIdentifier(catalog, ident) =>
- Some(catalog -> ident.asMultipartIdentifier)
- case _ => None
- }
+ case UnresolvedIdentifier(CatalogAndIdentifier(catalog, identifier)) =>
+ ResolvedIdentifier(catalog, identifier)
+ case s @ ShowTables(UnresolvedNamespace(Seq()), _, _) =>
+ s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
+ case s @ ShowTableExtended(UnresolvedNamespace(Seq()), _, _, _) =>
+ s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
+ case s @ ShowViews(UnresolvedNamespace(Seq()), _, _) =>
+ s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
+ case s @ ShowFunctions(UnresolvedNamespace(Seq()), _, _, _, _) =>
+ s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
+ case a @ AnalyzeTables(UnresolvedNamespace(Seq()), _) =>
+ a.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
+ case UnresolvedNamespace(Seq()) =>
+ ResolvedNamespace(currentCatalog, Seq.empty[String])
+ case UnresolvedNamespace(CatalogAndNamespace(catalog, ns)) =>
+ ResolvedNamespace(catalog, ns)
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index a87f9e0082d..6095d812d66 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -131,15 +131,15 @@ case class UnresolvedFunc(
}
/**
- * Holds the name of a database object (table, view, namespace, function, etc.) that is to be
- * created and we need to determine the catalog to store it. It will be resolved to
- * [[ResolvedDBObjectName]] during analysis.
+ * Holds the name of a table/view/function identifier that we need to determine the catalog. It will
+ * be resolved to [[ResolvedIdentifier]] during analysis.
*/
-case class UnresolvedDBObjectName(nameParts: Seq[String], isNamespace: Boolean) extends LeafNode {
+case class UnresolvedIdentifier(nameParts: Seq[String]) extends LeafNode {
override lazy val resolved: Boolean = false
override def output: Seq[Attribute] = Nil
}
+
/**
* A resolved leaf node whose statistics has no meaning.
*/
@@ -225,11 +225,10 @@ case class ResolvedNonPersistentFunc(
}
/**
- * A plan containing resolved database object name with catalog determined.
+ * A plan containing resolved identifier with catalog determined.
*/
-case class ResolvedDBObjectName(
+case class ResolvedIdentifier(
catalog: CatalogPlugin,
- nameParts: Seq[String])
- extends LeafNodeWithoutStats {
+ identifier: Identifier) extends LeafNodeWithoutStats {
override def output: Seq[Attribute] = Nil
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 05b3ddca022..890c8f4000f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3193,9 +3193,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
}
CreateNamespace(
- UnresolvedDBObjectName(
- visitMultipartIdentifier(ctx.multipartIdentifier),
- isNamespace = true),
+ UnresolvedNamespace(visitMultipartIdentifier(ctx.multipartIdentifier)),
ctx.EXISTS != null,
properties)
}
@@ -3587,16 +3585,14 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
ctx)
case Some(query) =>
- CreateTableAsSelect(
- UnresolvedDBObjectName(table, isNamespace = false),
+ CreateTableAsSelect(UnresolvedIdentifier(table),
partitioning, query, tableSpec, Map.empty, ifNotExists)
case _ =>
// Note: table schema includes both the table columns list and the partition columns
// with data type.
val schema = StructType(columns ++ partCols)
- CreateTable(
- UnresolvedDBObjectName(table, isNamespace = false),
+ CreateTable(UnresolvedIdentifier(table),
schema, partitioning, tableSpec, ignoreIfExists = ifNotExists)
}
}
@@ -3659,16 +3655,14 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
ctx)
case Some(query) =>
- ReplaceTableAsSelect(
- UnresolvedDBObjectName(table, isNamespace = false),
+ ReplaceTableAsSelect(UnresolvedIdentifier(table),
partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate)
case _ =>
// Note: table schema includes both the table columns list and the partition columns
// with data type.
val schema = StructType(columns ++ partCols)
- ReplaceTable(
- UnresolvedDBObjectName(table, isNamespace = false),
+ ReplaceTable(UnresolvedIdentifier(table),
schema, partitioning, tableSpec, orCreate = orCreate)
}
}
@@ -3702,7 +3696,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
*/
override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) {
val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier)
- SetCatalogAndNamespace(UnresolvedDBObjectName(nameParts, isNamespace = true))
+ SetCatalogAndNamespace(UnresolvedNamespace(nameParts))
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index b1b8843aa33..fb35e37feb0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.{sources, AnalysisException}
-import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, UnresolvedException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.FunctionResource
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, Unevaluable}
@@ -275,13 +275,12 @@ case class CreateTable(
partitioning: Seq[Transform],
tableSpec: TableSpec,
ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan {
- import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
override def child: LogicalPlan = name
override def tableName: Identifier = {
assert(child.resolved)
- child.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
+ child.asInstanceOf[ResolvedIdentifier].identifier
}
override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlan =
@@ -302,7 +301,6 @@ case class CreateTableAsSelect(
tableSpec: TableSpec,
writeOptions: Map[String, String],
ignoreIfExists: Boolean) extends BinaryCommand with V2CreateTablePlan {
- import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
override def tableSchema: StructType = query.schema
override def left: LogicalPlan = name
@@ -310,7 +308,7 @@ case class CreateTableAsSelect(
override def tableName: Identifier = {
assert(left.resolved)
- left.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
+ left.asInstanceOf[ResolvedIdentifier].identifier
}
override lazy val resolved: Boolean = childrenResolved && {
@@ -345,13 +343,12 @@ case class ReplaceTable(
partitioning: Seq[Transform],
tableSpec: TableSpec,
orCreate: Boolean) extends UnaryCommand with V2CreateTablePlan {
- import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
override def child: LogicalPlan = name
override def tableName: Identifier = {
assert(child.resolved)
- child.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
+ child.asInstanceOf[ResolvedIdentifier].identifier
}
override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlan =
@@ -375,7 +372,6 @@ case class ReplaceTableAsSelect(
tableSpec: TableSpec,
writeOptions: Map[String, String],
orCreate: Boolean) extends BinaryCommand with V2CreateTablePlan {
- import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
override def tableSchema: StructType = query.schema
override def left: LogicalPlan = name
@@ -390,7 +386,7 @@ case class ReplaceTableAsSelect(
override def tableName: Identifier = {
assert(name.resolved)
- name.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
+ name.asInstanceOf[ResolvedIdentifier].identifier
}
override protected def withNewChildrenInternal(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
index ced83b31c7f..941d0209ea6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
@@ -32,7 +32,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
val tableSpec = TableSpec(Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
- UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+ UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "does_not_exist") :: Nil,
TestRelation2,
tableSpec,
@@ -49,7 +49,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
val tableSpec = TableSpec(Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
- UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+ UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "does_not_exist.z") :: Nil,
TestRelation2,
tableSpec,
@@ -66,7 +66,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
val tableSpec = TableSpec(Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
- UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+ UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "point.z") :: Nil,
TestRelation2,
tableSpec,
@@ -83,7 +83,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
val tableSpec = TableSpec(Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
- UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+ UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "does_not_exist", "point.z") :: Nil,
TestRelation2,
tableSpec,
@@ -101,7 +101,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
val tableSpec = TableSpec(Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
- UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+ UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "id") :: Nil,
TestRelation2,
tableSpec,
@@ -115,7 +115,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
val tableSpec = TableSpec(Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
- UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+ UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "point.x") :: Nil,
TestRelation2,
tableSpec,
@@ -129,7 +129,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
val tableSpec = TableSpec(Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
- UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+ UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, "point") :: Nil,
TestRelation2,
tableSpec,
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index f06377f36a8..eb3e9baaacd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2097,7 +2097,7 @@ class DDLParserSuite extends AnalysisTest {
plan match {
case create: CreateTable =>
TableSpec(
- create.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
+ create.name.asInstanceOf[UnresolvedIdentifier].nameParts,
Some(create.tableSchema),
create.partitioning,
create.tableSpec.properties,
@@ -2109,7 +2109,7 @@ class DDLParserSuite extends AnalysisTest {
create.tableSpec.external)
case replace: ReplaceTable =>
TableSpec(
- replace.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
+ replace.name.asInstanceOf[UnresolvedIdentifier].nameParts,
Some(replace.tableSchema),
replace.partitioning,
replace.tableSpec.properties,
@@ -2120,7 +2120,7 @@ class DDLParserSuite extends AnalysisTest {
replace.tableSpec.serde)
case ctas: CreateTableAsSelect =>
TableSpec(
- ctas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
+ ctas.name.asInstanceOf[UnresolvedIdentifier].nameParts,
Some(ctas.query).filter(_.resolved).map(_.schema),
ctas.partitioning,
ctas.tableSpec.properties,
@@ -2132,7 +2132,7 @@ class DDLParserSuite extends AnalysisTest {
ctas.tableSpec.external)
case rtas: ReplaceTableAsSelect =>
TableSpec(
- rtas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
+ rtas.name.asInstanceOf[UnresolvedIdentifier].nameParts,
Some(rtas.query).filter(_.resolved).map(_.schema),
rtas.partitioning,
rtas.tableSpec.properties,
@@ -2224,12 +2224,12 @@ class DDLParserSuite extends AnalysisTest {
.putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "\"abc\"").build())
comparePlans(parsePlan(
"CREATE TABLE my_tab(a INT, b STRING NOT NULL DEFAULT \"abc\") USING parquet"),
- CreateTable(UnresolvedDBObjectName(Seq("my_tab"), false), schemaWithDefaultColumn,
+ CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
Map.empty[String, String], None, None, None, false), false))
comparePlans(parsePlan("REPLACE TABLE my_tab(a INT, " +
"b STRING NOT NULL DEFAULT \"abc\") USING parquet"),
- ReplaceTable(UnresolvedDBObjectName(Seq("my_tab"), false), schemaWithDefaultColumn,
+ ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
Map.empty[String, String], None, None, None, false), false))
// THese ALTER TABLE statements should parse successfully.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index e5adab866d9..7e11b8b6d36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.{CatalystIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedDBObjectName, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedIdentifier, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
@@ -336,10 +336,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
external = false)
runCommand(df.sparkSession) {
CreateTableAsSelect(
- UnresolvedDBObjectName(
- catalog.name +: ident.namespace.toSeq :+ ident.name,
- isNamespace = false
- ),
+ UnresolvedIdentifier(catalog.name +: ident.namespace.toSeq :+ ident.name),
partitioningAsV2,
df.queryExecution.analyzed,
tableSpec,
@@ -603,7 +600,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
serde = None,
external = false)
ReplaceTableAsSelect(
- UnresolvedDBObjectName(nameParts, isNamespace = false),
+ UnresolvedIdentifier(nameParts),
partitioningAsV2,
df.queryExecution.analyzed,
tableSpec,
@@ -624,7 +621,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
external = false)
CreateTableAsSelect(
- UnresolvedDBObjectName(nameParts, isNamespace = false),
+ UnresolvedIdentifier(nameParts),
partitioningAsV2,
df.queryExecution.analyzed,
tableSpec,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index 93127e6288a..270bf0a948e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedDBObjectName, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedIdentifier, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
@@ -117,7 +117,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
external = false)
runCommand(
CreateTableAsSelect(
- UnresolvedDBObjectName(tableName, isNamespace = false),
+ UnresolvedIdentifier(tableName),
partitioning.getOrElse(Seq.empty),
logicalPlan,
tableSpec,
@@ -205,7 +205,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
serde = None,
external = false)
runCommand(ReplaceTableAsSelect(
- UnresolvedDBObjectName(tableName, isNamespace = false),
+ UnresolvedIdentifier(tableName),
partitioning.getOrElse(Seq.empty),
logicalPlan,
tableSpec,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 3e39863f5bb..a495e35bf2c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -151,19 +151,19 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
// session catalog and the table provider is not v2.
- case c @ CreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _)
+ case c @ CreateTable(ResolvedIdentifier(catalog, ident), _, _, _, _)
if isSessionCatalog(catalog) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
ctas = false)
if (!isV2Provider(provider)) {
- constructV1TableCmd(None, c.tableSpec, name, c.tableSchema, c.partitioning,
+ constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema, c.partitioning,
c.ignoreIfExists, storageFormat, provider)
} else {
c
}
- case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, writeOptions, _)
+ case c @ CreateTableAsSelect(ResolvedIdentifier(catalog, ident), _, _, _, writeOptions, _)
if isSessionCatalog(catalog) =>
val (storageFormat, provider) = getStorageFormatAndProvider(
c.tableSpec.provider,
@@ -173,7 +173,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
ctas = true)
if (!isV2Provider(provider)) {
- constructV1TableCmd(Some(c.query), c.tableSpec, name, new StructType, c.partitioning,
+ constructV1TableCmd(Some(c.query), c.tableSpec, ident, new StructType, c.partitioning,
c.ignoreIfExists, storageFormat, provider)
} else {
c
@@ -187,7 +187,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
// session catalog and the table provider is not v2.
- case c @ ReplaceTable(ResolvedDBObjectName(catalog, _), _, _, _, _)
+ case c @ ReplaceTable(ResolvedIdentifier(catalog, _), _, _, _, _)
if isSessionCatalog(catalog) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
@@ -196,7 +196,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
c
}
- case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, _, _)
+ case c @ ReplaceTableAsSelect(ResolvedIdentifier(catalog, _), _, _, _, _, _)
if isSessionCatalog(catalog) =>
val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
if (!isV2Provider(provider)) {
@@ -358,11 +358,11 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
originalText,
query)
- case CreateView(ResolvedDBObjectName(catalog, nameParts), userSpecifiedColumns, comment,
+ case CreateView(ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment,
properties, originalText, child, allowExisting, replace) =>
if (isSessionCatalog(catalog)) {
CreateViewCommand(
- name = attachSessionCatalog(nameParts.asTableIdentifier),
+ name = attachSessionCatalog(ident.asTableIdentifier),
userSpecifiedColumns = userSpecifiedColumns,
comment = comment,
properties = properties,
@@ -419,17 +419,17 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION")
}
- case CreateFunction(ResolvedDBObjectName(catalog, nameParts),
+ case CreateFunction(ResolvedIdentifier(catalog, ident),
className, resources, ignoreIfExists, replace) =>
if (isSessionCatalog(catalog)) {
- val database = if (nameParts.length > 2) {
- throw QueryCompilationErrors.requiresSinglePartNamespaceError(nameParts)
- } else if (nameParts.length == 2) {
- Some(nameParts.head)
+ val database = if (ident.namespace().length > 1) {
+ throw QueryCompilationErrors.requiresSinglePartNamespaceError(ident.asMultipartIdentifier)
+ } else if (ident.namespace().length == 1) {
+ Some(ident.namespace().head)
} else {
None
}
- val identifier = FunctionIdentifier(nameParts.last, database)
+ val identifier = FunctionIdentifier(ident.name(), database)
CreateFunctionCommand(
identifier,
className,
@@ -445,13 +445,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
private def constructV1TableCmd(
query: Option[LogicalPlan],
tableSpec: TableSpec,
- name: Seq[String],
+ ident: Identifier,
tableSchema: StructType,
partitioning: Seq[Transform],
ignoreIfExists: Boolean,
storageFormat: CatalogStorageFormat,
provider: String): CreateTableV1 = {
- val tableDesc = buildCatalogTable(attachSessionCatalog(name.asTableIdentifier), tableSchema,
+ val tableDesc = buildCatalogTable(attachSessionCatalog(ident.asTableIdentifier), tableSchema,
partitioning, tableSpec.properties, provider,
tableSpec.location, tableSpec.comment, storageFormat, tableSpec.external)
val mode = if (ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
@@ -628,12 +628,12 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
private object DatabaseNameInSessionCatalog {
- def unapply(resolved: ResolvedDBObjectName): Option[String] = resolved match {
- case ResolvedDBObjectName(catalog, _) if !isSessionCatalog(catalog) => None
- case ResolvedDBObjectName(_, Seq(dbName)) => Some(dbName)
+ def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
+ case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None
+ case ResolvedNamespace(_, Seq(dbName)) => Some(dbName)
case _ =>
- assert(resolved.nameParts.length > 1)
- throw QueryCompilationErrors.invalidDatabaseNameError(resolved.nameParts.quoted)
+ assert(resolved.namespace.length > 1)
+ throw QueryCompilationErrors.invalidDatabaseNameError(resolved.namespace.quoted)
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index a8f7b88625b..a82fc47f427 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -27,7 +27,7 @@ import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.antlr.v4.runtime.tree.TerminalNode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedDBObjectName, UnresolvedFunc}
+import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedFunc, UnresolvedIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser._
@@ -485,9 +485,7 @@ class SparkSqlAstBuilder extends AstBuilder {
assert(Option(originalText).isDefined,
"'originalText' must be provided to create permanent view")
CreateView(
- UnresolvedDBObjectName(
- visitMultipartIdentifier(ctx.multipartIdentifier),
- false),
+ UnresolvedIdentifier(visitMultipartIdentifier(ctx.multipartIdentifier)),
userSpecifiedColumns,
visitCommentSpecList(ctx.commentSpec()),
properties,
@@ -549,9 +547,7 @@ class SparkSqlAstBuilder extends AstBuilder {
val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
if (ctx.TEMPORARY == null) {
CreateFunction(
- UnresolvedDBObjectName(
- functionIdentifier,
- isNamespace = false),
+ UnresolvedIdentifier(functionIdentifier),
string(ctx.className),
resources.toSeq,
ctx.EXISTS != null,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 93445eea51b..16c6b331d10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
import org.apache.spark.sql.{SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedDBObjectName, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
import org.apache.spark.sql.catalyst.catalog.CatalogUtils
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper, SubqueryExpression}
@@ -169,50 +169,50 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil
- case CreateTable(ResolvedDBObjectName(catalog, ident), schema, partitioning,
+ case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning,
tableSpec, ifNotExists) =>
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
schema, tableSpec.provider, "CREATE TABLE")
- CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, newSchema,
+ CreateTableExec(catalog.asTableCatalog, ident, newSchema,
partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
- case CreateTableAsSelect(ResolvedDBObjectName(catalog, ident), parts, query, tableSpec,
+ case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec,
options, ifNotExists) =>
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
- AtomicCreateTableAsSelectExec(staging, ident.asIdentifier, parts, query, planLater(query),
+ AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query),
qualifyLocInTableSpec(tableSpec), writeOptions, ifNotExists) :: Nil
case _ =>
- CreateTableAsSelectExec(catalog.asTableCatalog, ident.asIdentifier, parts, query,
+ CreateTableAsSelectExec(catalog.asTableCatalog, ident, parts, query,
planLater(query), qualifyLocInTableSpec(tableSpec), writeOptions, ifNotExists) :: Nil
}
case RefreshTable(r: ResolvedTable) =>
RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
- case ReplaceTable(ResolvedDBObjectName(catalog, ident), schema, parts, tableSpec, orCreate) =>
+ case ReplaceTable(ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate) =>
val newSchema: StructType =
ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
schema, tableSpec.provider, "CREATE TABLE")
catalog match {
case staging: StagingTableCatalog =>
- AtomicReplaceTableExec(staging, ident.asIdentifier, newSchema, parts,
+ AtomicReplaceTableExec(staging, ident, newSchema, parts,
qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil
case _ =>
- ReplaceTableExec(catalog.asTableCatalog, ident.asIdentifier, newSchema, parts,
+ ReplaceTableExec(catalog.asTableCatalog, ident, newSchema, parts,
qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil
}
- case ReplaceTableAsSelect(ResolvedDBObjectName(catalog, ident),
+ case ReplaceTableAsSelect(ResolvedIdentifier(catalog, ident),
parts, query, tableSpec, options, orCreate) =>
val writeOptions = new CaseInsensitiveStringMap(options.asJava)
catalog match {
case staging: StagingTableCatalog =>
AtomicReplaceTableAsSelectExec(
staging,
- ident.asIdentifier,
+ ident,
parts,
query,
planLater(query),
@@ -223,7 +223,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case _ =>
ReplaceTableAsSelectExec(
catalog.asTableCatalog,
- ident.asIdentifier,
+ ident,
parts,
query,
planLater(query),
@@ -352,11 +352,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
ns,
Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil
- case CreateNamespace(ResolvedDBObjectName(catalog, name), ifNotExists, properties) =>
+ case CreateNamespace(ResolvedNamespace(catalog, ns), ifNotExists, properties) =>
val finalProperties = properties.get(SupportsNamespaces.PROP_LOCATION).map { loc =>
properties + (SupportsNamespaces.PROP_LOCATION -> makeQualifiedDBObjectPath(loc))
}.getOrElse(properties)
- CreateNamespaceExec(catalog.asNamespaceCatalog, name, ifNotExists, finalProperties) :: Nil
+ CreateNamespaceExec(catalog.asNamespaceCatalog, ns, ifNotExists, finalProperties) :: Nil
case DropNamespace(ResolvedNamespace(catalog, ns), ifExists, cascade) =>
DropNamespaceExec(catalog, ns, ifExists, cascade) :: Nil
@@ -367,7 +367,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case ShowTables(ResolvedNamespace(catalog, ns), pattern, output) =>
ShowTablesExec(output, catalog.asTableCatalog, ns, pattern) :: Nil
- case SetCatalogAndNamespace(ResolvedDBObjectName(catalog, ns)) =>
+ case SetCatalogAndNamespace(ResolvedNamespace(catalog, ns)) =>
val catalogManager = session.sessionState.catalogManager
val namespace = if (ns.nonEmpty) Some(ns) else None
SetCatalogAndNamespaceExec(catalogManager, Some(catalog.name()), namespace) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 880c084ab6f..8ca11f620a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -23,7 +23,7 @@ import scala.util.control.NonFatal
import org.apache.spark.sql._
import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedNonPersistentFunc, ResolvedPersistentFunc, ResolvedTable, ResolvedView, UnresolvedDBObjectName, UnresolvedFunc, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedNonPersistentFunc, ResolvedPersistentFunc, ResolvedTable, ResolvedView, UnresolvedFunc, UnresolvedIdentifier, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, SubqueryAlias, TableSpec, View}
@@ -657,7 +657,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
external = tableType == CatalogTableType.EXTERNAL)
val plan = CreateTable(
- name = UnresolvedDBObjectName(ident, isNamespace = false),
+ name = UnresolvedIdentifier(ident),
tableSchema = schema,
partitioning = Seq(),
tableSpec = tableSpec,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 0084ca53a26..f1d2e351ebd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.Evolving
import org.apache.spark.api.java.function.VoidFunction2
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedDBObjectName
+import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, TableSpec}
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
@@ -298,9 +298,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
None,
false)
val cmd = CreateTable(
- UnresolvedDBObjectName(
- originalMultipartIdentifier,
- isNamespace = false),
+ UnresolvedIdentifier(originalMultipartIdentifier),
df.schema.asNullable,
partitioningColumns.getOrElse(Nil).asTransforms.toSeq,
tableSpec,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index b6a8480814e..08de29be27e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.connector
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedDBObjectName, UnresolvedFieldName, UnresolvedFieldPosition}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedIdentifier}
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, TableSpec}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.Identifier
@@ -49,7 +49,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
val tableSpec = TableSpec(Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
- UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+ UnresolvedIdentifier(Array("table_name")),
Expressions.identity(ref) :: Nil,
TestRelation2,
tableSpec,
@@ -73,7 +73,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
val tableSpec = TableSpec(Map.empty, None, Map.empty,
None, None, None, false)
val plan = CreateTableAsSelect(
- UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+ UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, ref) :: Nil,
TestRelation2,
tableSpec,
@@ -98,7 +98,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
val tableSpec = TableSpec(Map.empty, None, Map.empty,
None, None, None, false)
val plan = ReplaceTableAsSelect(
- UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+ UnresolvedIdentifier(Array("table_name")),
Expressions.identity(ref) :: Nil,
TestRelation2,
tableSpec,
@@ -122,7 +122,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
val tableSpec = TableSpec(Map.empty, None, Map.empty,
None, None, None, false)
val plan = ReplaceTableAsSelect(
- UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+ UnresolvedIdentifier(Array("table_name")),
Expressions.bucket(4, ref) :: Nil,
TestRelation2,
tableSpec,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateNamespaceParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateNamespaceParserSuite.scala
index 6c59512148a..6a2a8647961 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateNamespaceParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateNamespaceParserSuite.scala
@@ -17,14 +17,14 @@
package org.apache.spark.sql.execution.command
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedDBObjectName}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedNamespace}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
import org.apache.spark.sql.catalyst.plans.logical.CreateNamespace
class CreateNamespaceParserSuite extends AnalysisTest {
test("create namespace -- backward compatibility with DATABASE/DBPROPERTIES") {
val expected = CreateNamespace(
- UnresolvedDBObjectName(Seq("a", "b", "c"), true),
+ UnresolvedNamespace(Seq("a", "b", "c")),
ifNotExists = true,
Map(
"a" -> "a",
@@ -98,7 +98,7 @@ class CreateNamespaceParserSuite extends AnalysisTest {
comparePlans(
parsePlan(sql),
CreateNamespace(
- UnresolvedDBObjectName(Seq("a", "b", "c"), true),
+ UnresolvedNamespace(Seq("a", "b", "c")),
ifNotExists = false,
Map(
"a" -> "1",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index 05378e32296..49059fe9ef4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
import java.util.Locale
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, UnresolvedAttribute, UnresolvedDBObjectName, UnresolvedFunc}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, UnresolvedAttribute, UnresolvedFunc, UnresolvedIdentifier}
import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, FileResource, FunctionResource, JarResource}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans
@@ -314,7 +314,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
val parsed1 = parser.parsePlan(v1)
val expected1 = CreateView(
- UnresolvedDBObjectName(Seq("view1"), false),
+ UnresolvedIdentifier(Seq("view1")),
Seq.empty[(String, Option[String])],
None,
Map.empty[String, String],
@@ -354,7 +354,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
""".stripMargin
val parsed1 = parser.parsePlan(v1)
val expected1 = CreateView(
- UnresolvedDBObjectName(Seq("view1"), false),
+ UnresolvedIdentifier(Seq("view1")),
Seq("col1" -> None, "col3" -> Some("hello")),
Some("BLABLA"),
Map("prop1Key" -> "prop1Val"),
@@ -410,35 +410,35 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
test("CREATE FUNCTION") {
comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun'"),
- CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun", Seq(), false, false))
+ CreateFunction(UnresolvedIdentifier(Seq("a")), "fun", Seq(), false, false))
comparePlans(parser.parsePlan("CREATE FUNCTION a.b.c as 'fun'"),
- CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), false, false))
+ CreateFunction(UnresolvedIdentifier(Seq("a", "b", "c")), "fun", Seq(), false, false))
comparePlans(parser.parsePlan("CREATE OR REPLACE FUNCTION a.b.c as 'fun'"),
- CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), false, true))
+ CreateFunction(UnresolvedIdentifier(Seq("a", "b", "c")), "fun", Seq(), false, true))
comparePlans(parser.parsePlan("CREATE TEMPORARY FUNCTION a as 'fun'"),
CreateFunctionCommand(Seq("a").asFunctionIdentifier, "fun", Seq(), true, false, false))
comparePlans(parser.parsePlan("CREATE FUNCTION IF NOT EXISTS a.b.c as 'fun'"),
- CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), true, false))
+ CreateFunction(UnresolvedIdentifier(Seq("a", "b", "c")), "fun", Seq(), true, false))
comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun' USING JAR 'j'"),
- CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+ CreateFunction(UnresolvedIdentifier(Seq("a")), "fun",
Seq(FunctionResource(JarResource, "j")), false, false))
comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun' USING ARCHIVE 'a'"),
- CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+ CreateFunction(UnresolvedIdentifier(Seq("a")), "fun",
Seq(FunctionResource(ArchiveResource, "a")), false, false))
comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun' USING FILE 'f'"),
- CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+ CreateFunction(UnresolvedIdentifier(Seq("a")), "fun",
Seq(FunctionResource(FileResource, "f")), false, false))
comparePlans(
parser.parsePlan("CREATE FUNCTION a as 'fun' USING JAR 'j', ARCHIVE 'a', FILE 'f'"),
- CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+ CreateFunction(UnresolvedIdentifier(Seq("a")), "fun",
Seq(FunctionResource(JarResource, "j"),
FunctionResource(ArchiveResource, "a"), FunctionResource(FileResource, "f")),
false, false))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index ef169f3a370..610fcd54b2a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -26,7 +26,7 @@ import org.mockito.invocation.InvocationOnMock
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedDBObjectName, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedInlineTable, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedFieldName, ResolvedIdentifier, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedInlineTable, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral}
import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
@@ -536,8 +536,8 @@ class PlanResolutionSuite extends AnalysisTest {
parseAndResolve(sql) match {
case create: CreateTable =>
- assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat")
- assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") ==
+ assert(create.name.asInstanceOf[ResolvedIdentifier].catalog.name == "testcat")
+ assert(create.name.asInstanceOf[ResolvedIdentifier].identifier.toString ==
"mydb.table_name")
assert(create.tableSchema == new StructType()
.add("id", LongType)
@@ -567,8 +567,8 @@ class PlanResolutionSuite extends AnalysisTest {
parseAndResolve(sql, withDefault = true) match {
case create: CreateTable =>
- assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat")
- assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") ==
+ assert(create.name.asInstanceOf[ResolvedIdentifier].catalog.name == "testcat")
+ assert(create.name.asInstanceOf[ResolvedIdentifier].identifier.toString ==
"mydb.table_name")
assert(create.tableSchema == new StructType()
.add("id", LongType)
@@ -598,9 +598,9 @@ class PlanResolutionSuite extends AnalysisTest {
parseAndResolve(sql) match {
case create: CreateTable =>
- assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name ==
+ assert(create.name.asInstanceOf[ResolvedIdentifier].catalog.name ==
CatalogManager.SESSION_CATALOG_NAME)
- assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") ==
+ assert(create.name.asInstanceOf[ResolvedIdentifier].identifier.toString ==
"mydb.page_view")
assert(create.tableSchema == new StructType()
.add("id", LongType)
@@ -628,9 +628,9 @@ class PlanResolutionSuite extends AnalysisTest {
parseAndResolve(sql) match {
case ctas: CreateTableAsSelect =>
- assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat")
+ assert(ctas.name.asInstanceOf[ResolvedIdentifier].catalog.name == "testcat")
assert(
- ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name"
+ ctas.name.asInstanceOf[ResolvedIdentifier].identifier.toString == "mydb.table_name"
)
assert(ctas.writeOptions.isEmpty)
assert(ctas.partitioning.isEmpty)
@@ -655,9 +655,9 @@ class PlanResolutionSuite extends AnalysisTest {
parseAndResolve(sql, withDefault = true) match {
case ctas: CreateTableAsSelect =>
- assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat")
+ assert(ctas.name.asInstanceOf[ResolvedIdentifier].catalog.name == "testcat")
assert(
- ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name"
+ ctas.name.asInstanceOf[ResolvedIdentifier].identifier.toString == "mydb.table_name"
)
assert(ctas.writeOptions.isEmpty)
assert(ctas.partitioning.isEmpty)
@@ -682,9 +682,9 @@ class PlanResolutionSuite extends AnalysisTest {
parseAndResolve(sql) match {
case ctas: CreateTableAsSelect =>
- assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name ==
+ assert(ctas.name.asInstanceOf[ResolvedIdentifier].catalog.name ==
CatalogManager.SESSION_CATALOG_NAME)
- assert(ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") ==
+ assert(ctas.name.asInstanceOf[ResolvedIdentifier].identifier.toString ==
"mydb.page_view")
assert(ctas.writeOptions.isEmpty)
assert(ctas.partitioning.isEmpty)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org