You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/14 05:30:58 UTC

[spark] branch master updated: [SPARK-39767][SQL] Remove UnresolvedDBObjectName and add UnresolvedIdentifier

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fc157d8d490 [SPARK-39767][SQL] Remove UnresolvedDBObjectName and add UnresolvedIdentifier
fc157d8d490 is described below

commit fc157d8d4908f473ce32c2696bb9013dcaecd705
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Jul 14 13:30:37 2022 +0800

    [SPARK-39767][SQL] Remove UnresolvedDBObjectName and add UnresolvedIdentifier
    
    ### What changes were proposed in this pull request?
    
    This PR removes `UnresolvedDBObjectName` and adds a new `UnresolvedIdentifier` to replace it. The same for the resolved plans. The motivation is, `UnresolvedDBObjectName` has a `isNamespace` flag. If it's true, then `UnresolvedDBObjectName` has no difference from `UnresolvedNamespace`. It's a bit weird to have 2 options to do the same thing. If we need to resolve a namespace, we should always use `UnresolvedNamespace`.
    
    ### Why are the changes needed?
    
    code simplification
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #37178 from cloud-fan/refactor.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 21 -----------
 .../sql/catalyst/analysis/ResolveCatalogs.scala    | 35 +++++++++---------
 .../sql/catalyst/analysis/v2ResolutionPlans.scala  | 15 ++++----
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 18 ++++------
 .../sql/catalyst/plans/logical/v2Commands.scala    | 14 +++-----
 .../CreateTablePartitioningValidationSuite.scala   | 14 ++++----
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 12 +++----
 .../org/apache/spark/sql/DataFrameWriter.scala     | 11 +++---
 .../org/apache/spark/sql/DataFrameWriterV2.scala   |  6 ++--
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 42 +++++++++++-----------
 .../spark/sql/execution/SparkSqlParser.scala       | 10 ++----
 .../datasources/v2/DataSourceV2Strategy.scala      | 30 ++++++++--------
 .../apache/spark/sql/internal/CatalogImpl.scala    |  4 +--
 .../spark/sql/streaming/DataStreamWriter.scala     |  6 ++--
 .../connector/V2CommandsCaseSensitivitySuite.scala | 10 +++---
 .../command/CreateNamespaceParserSuite.scala       |  6 ++--
 .../sql/execution/command/DDLParserSuite.scala     | 22 ++++++------
 .../execution/command/PlanResolutionSuite.scala    | 26 +++++++-------
 18 files changed, 131 insertions(+), 171 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index e72ebb29002..7667b4fef71 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -278,7 +278,6 @@ class Analyzer(override val catalogManager: CatalogManager)
       KeepLegacyOutputs),
     Batch("Resolution", fixedPoint,
       ResolveTableValuedFunctions(v1SessionCatalog) ::
-      ResolveNamespace(catalogManager) ::
       new ResolveCatalogs(catalogManager) ::
       ResolveUserSpecifiedColumns ::
       ResolveInsertInto ::
@@ -860,26 +859,6 @@ class Analyzer(override val catalogManager: CatalogManager)
     }
   }
 
-  case class ResolveNamespace(catalogManager: CatalogManager)
-    extends Rule[LogicalPlan] with LookupCatalog {
-    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-      case s @ ShowTables(UnresolvedNamespace(Seq()), _, _) =>
-        s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
-      case s @ ShowTableExtended(UnresolvedNamespace(Seq()), _, _, _) =>
-        s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
-      case s @ ShowViews(UnresolvedNamespace(Seq()), _, _) =>
-        s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
-      case s @ ShowFunctions(UnresolvedNamespace(Seq()), _, _, _, _) =>
-        s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
-      case a @ AnalyzeTables(UnresolvedNamespace(Seq()), _) =>
-        a.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
-      case UnresolvedNamespace(Seq()) =>
-        ResolvedNamespace(currentCatalog, Seq.empty[String])
-      case UnresolvedNamespace(CatalogAndNamespace(catalog, ns)) =>
-        ResolvedNamespace(catalog, ns)
-    }
-  }
-
   private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty
   private def isReferredTempViewName(nameParts: Seq[String]): Boolean = {
     AnalysisContext.get.referredTempViewNames.exists { n =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 8af1d6c6023..9893384b709 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -19,29 +19,30 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, LookupCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
 
 /**
- * Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
- * to the corresponding v2 commands if the resolved catalog is not the session catalog.
+ * Resolves the catalog of the name parts for table/view/function/namespace.
  */
 class ResolveCatalogs(val catalogManager: CatalogManager)
   extends Rule[LogicalPlan] with LookupCatalog {
-  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case UnresolvedDBObjectName(CatalogAndNamespace(catalog, name), isNamespace) if isNamespace =>
-      ResolvedDBObjectName(catalog, name)
-
-    case UnresolvedDBObjectName(CatalogAndIdentifier(catalog, identifier), _) =>
-      ResolvedDBObjectName(catalog, identifier.namespace :+ identifier.name())
-  }
-
-  object NonSessionCatalogAndTable {
-    def unapply(nameParts: Seq[String]): Option[(CatalogPlugin, Seq[String])] = nameParts match {
-      case NonSessionCatalogAndIdentifier(catalog, ident) =>
-        Some(catalog -> ident.asMultipartIdentifier)
-      case _ => None
-    }
+    case UnresolvedIdentifier(CatalogAndIdentifier(catalog, identifier)) =>
+      ResolvedIdentifier(catalog, identifier)
+    case s @ ShowTables(UnresolvedNamespace(Seq()), _, _) =>
+      s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
+    case s @ ShowTableExtended(UnresolvedNamespace(Seq()), _, _, _) =>
+      s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
+    case s @ ShowViews(UnresolvedNamespace(Seq()), _, _) =>
+      s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
+    case s @ ShowFunctions(UnresolvedNamespace(Seq()), _, _, _, _) =>
+      s.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
+    case a @ AnalyzeTables(UnresolvedNamespace(Seq()), _) =>
+      a.copy(namespace = ResolvedNamespace(currentCatalog, catalogManager.currentNamespace))
+    case UnresolvedNamespace(Seq()) =>
+      ResolvedNamespace(currentCatalog, Seq.empty[String])
+    case UnresolvedNamespace(CatalogAndNamespace(catalog, ns)) =>
+      ResolvedNamespace(catalog, ns)
   }
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index a87f9e0082d..6095d812d66 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -131,15 +131,15 @@ case class UnresolvedFunc(
 }
 
 /**
- * Holds the name of a database object (table, view, namespace, function, etc.) that is to be
- * created and we need to determine the catalog to store it. It will be resolved to
- * [[ResolvedDBObjectName]] during analysis.
+ * Holds the name of a table/view/function identifier that we need to determine the catalog. It will
+ * be resolved to [[ResolvedIdentifier]] during analysis.
  */
-case class UnresolvedDBObjectName(nameParts: Seq[String], isNamespace: Boolean) extends LeafNode {
+case class UnresolvedIdentifier(nameParts: Seq[String]) extends LeafNode {
   override lazy val resolved: Boolean = false
   override def output: Seq[Attribute] = Nil
 }
 
+
 /**
  * A resolved leaf node whose statistics has no meaning.
  */
@@ -225,11 +225,10 @@ case class ResolvedNonPersistentFunc(
 }
 
 /**
- * A plan containing resolved database object name with catalog determined.
+ * A plan containing resolved identifier with catalog determined.
  */
-case class ResolvedDBObjectName(
+case class ResolvedIdentifier(
     catalog: CatalogPlugin,
-    nameParts: Seq[String])
-  extends LeafNodeWithoutStats {
+    identifier: Identifier) extends LeafNodeWithoutStats {
   override def output: Seq[Attribute] = Nil
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 05b3ddca022..890c8f4000f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3193,9 +3193,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     }
 
     CreateNamespace(
-      UnresolvedDBObjectName(
-        visitMultipartIdentifier(ctx.multipartIdentifier),
-        isNamespace = true),
+      UnresolvedNamespace(visitMultipartIdentifier(ctx.multipartIdentifier)),
       ctx.EXISTS != null,
       properties)
   }
@@ -3587,16 +3585,14 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
           ctx)
 
       case Some(query) =>
-        CreateTableAsSelect(
-          UnresolvedDBObjectName(table, isNamespace = false),
+        CreateTableAsSelect(UnresolvedIdentifier(table),
           partitioning, query, tableSpec, Map.empty, ifNotExists)
 
       case _ =>
         // Note: table schema includes both the table columns list and the partition columns
         // with data type.
         val schema = StructType(columns ++ partCols)
-        CreateTable(
-          UnresolvedDBObjectName(table, isNamespace = false),
+        CreateTable(UnresolvedIdentifier(table),
           schema, partitioning, tableSpec, ignoreIfExists = ifNotExists)
     }
   }
@@ -3659,16 +3655,14 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
           ctx)
 
       case Some(query) =>
-        ReplaceTableAsSelect(
-          UnresolvedDBObjectName(table, isNamespace = false),
+        ReplaceTableAsSelect(UnresolvedIdentifier(table),
           partitioning, query, tableSpec, writeOptions = Map.empty, orCreate = orCreate)
 
       case _ =>
         // Note: table schema includes both the table columns list and the partition columns
         // with data type.
         val schema = StructType(columns ++ partCols)
-        ReplaceTable(
-          UnresolvedDBObjectName(table, isNamespace = false),
+        ReplaceTable(UnresolvedIdentifier(table),
           schema, partitioning, tableSpec, orCreate = orCreate)
     }
   }
@@ -3702,7 +3696,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
    */
   override def visitUse(ctx: UseContext): LogicalPlan = withOrigin(ctx) {
     val nameParts = visitMultipartIdentifier(ctx.multipartIdentifier)
-    SetCatalogAndNamespace(UnresolvedDBObjectName(nameParts, isNamespace = true))
+    SetCatalogAndNamespace(UnresolvedNamespace(nameParts))
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index b1b8843aa33..fb35e37feb0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.sql.{sources, AnalysisException}
-import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedDBObjectName, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, EliminateSubqueryAliases, FieldName, NamedRelation, PartitionSpec, ResolvedIdentifier, UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.catalog.FunctionResource
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, Expression, MetadataAttribute, Unevaluable}
@@ -275,13 +275,12 @@ case class CreateTable(
     partitioning: Seq[Transform],
     tableSpec: TableSpec,
     ignoreIfExists: Boolean) extends UnaryCommand with V2CreateTablePlan {
-  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 
   override def child: LogicalPlan = name
 
   override def tableName: Identifier = {
     assert(child.resolved)
-    child.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
+    child.asInstanceOf[ResolvedIdentifier].identifier
   }
 
   override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlan =
@@ -302,7 +301,6 @@ case class CreateTableAsSelect(
     tableSpec: TableSpec,
     writeOptions: Map[String, String],
     ignoreIfExists: Boolean) extends BinaryCommand with V2CreateTablePlan {
-  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 
   override def tableSchema: StructType = query.schema
   override def left: LogicalPlan = name
@@ -310,7 +308,7 @@ case class CreateTableAsSelect(
 
   override def tableName: Identifier = {
     assert(left.resolved)
-    left.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
+    left.asInstanceOf[ResolvedIdentifier].identifier
   }
 
   override lazy val resolved: Boolean = childrenResolved && {
@@ -345,13 +343,12 @@ case class ReplaceTable(
     partitioning: Seq[Transform],
     tableSpec: TableSpec,
     orCreate: Boolean) extends UnaryCommand with V2CreateTablePlan {
-  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 
   override def child: LogicalPlan = name
 
   override def tableName: Identifier = {
     assert(child.resolved)
-    child.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
+    child.asInstanceOf[ResolvedIdentifier].identifier
   }
 
   override protected def withNewChildInternal(newChild: LogicalPlan): V2CreateTablePlan =
@@ -375,7 +372,6 @@ case class ReplaceTableAsSelect(
     tableSpec: TableSpec,
     writeOptions: Map[String, String],
     orCreate: Boolean) extends BinaryCommand with V2CreateTablePlan {
-  import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
 
   override def tableSchema: StructType = query.schema
   override def left: LogicalPlan = name
@@ -390,7 +386,7 @@ case class ReplaceTableAsSelect(
 
   override def tableName: Identifier = {
     assert(name.resolved)
-    name.asInstanceOf[ResolvedDBObjectName].nameParts.asIdentifier
+    name.asInstanceOf[ResolvedIdentifier].identifier
   }
 
   override protected def withNewChildrenInternal(
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
index ced83b31c7f..941d0209ea6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/CreateTablePartitioningValidationSuite.scala
@@ -32,7 +32,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
     val tableSpec = TableSpec(Map.empty, None, Map.empty,
       None, None, None, false)
     val plan = CreateTableAsSelect(
-      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+      UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "does_not_exist") :: Nil,
       TestRelation2,
       tableSpec,
@@ -49,7 +49,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
     val tableSpec = TableSpec(Map.empty, None, Map.empty,
       None, None, None, false)
     val plan = CreateTableAsSelect(
-      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+      UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "does_not_exist.z") :: Nil,
       TestRelation2,
       tableSpec,
@@ -66,7 +66,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
     val tableSpec = TableSpec(Map.empty, None, Map.empty,
       None, None, None, false)
     val plan = CreateTableAsSelect(
-      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+      UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "point.z") :: Nil,
       TestRelation2,
       tableSpec,
@@ -83,7 +83,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
     val tableSpec = TableSpec(Map.empty, None, Map.empty,
       None, None, None, false)
     val plan = CreateTableAsSelect(
-      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+      UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "does_not_exist", "point.z") :: Nil,
       TestRelation2,
       tableSpec,
@@ -101,7 +101,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
     val tableSpec = TableSpec(Map.empty, None, Map.empty,
       None, None, None, false)
     val plan = CreateTableAsSelect(
-      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+      UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "id") :: Nil,
       TestRelation2,
       tableSpec,
@@ -115,7 +115,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
     val tableSpec = TableSpec(Map.empty, None, Map.empty,
       None, None, None, false)
     val plan = CreateTableAsSelect(
-      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+      UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "point.x") :: Nil,
       TestRelation2,
       tableSpec,
@@ -129,7 +129,7 @@ class CreateTablePartitioningValidationSuite extends AnalysisTest {
     val tableSpec = TableSpec(Map.empty, None, Map.empty,
       None, None, None, false)
     val plan = CreateTableAsSelect(
-      UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+      UnresolvedIdentifier(Array("table_name")),
       Expressions.bucket(4, "point") :: Nil,
       TestRelation2,
       tableSpec,
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index f06377f36a8..eb3e9baaacd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2097,7 +2097,7 @@ class DDLParserSuite extends AnalysisTest {
       plan match {
         case create: CreateTable =>
           TableSpec(
-            create.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
+            create.name.asInstanceOf[UnresolvedIdentifier].nameParts,
             Some(create.tableSchema),
             create.partitioning,
             create.tableSpec.properties,
@@ -2109,7 +2109,7 @@ class DDLParserSuite extends AnalysisTest {
             create.tableSpec.external)
         case replace: ReplaceTable =>
           TableSpec(
-            replace.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
+            replace.name.asInstanceOf[UnresolvedIdentifier].nameParts,
             Some(replace.tableSchema),
             replace.partitioning,
             replace.tableSpec.properties,
@@ -2120,7 +2120,7 @@ class DDLParserSuite extends AnalysisTest {
             replace.tableSpec.serde)
         case ctas: CreateTableAsSelect =>
           TableSpec(
-            ctas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
+            ctas.name.asInstanceOf[UnresolvedIdentifier].nameParts,
             Some(ctas.query).filter(_.resolved).map(_.schema),
             ctas.partitioning,
             ctas.tableSpec.properties,
@@ -2132,7 +2132,7 @@ class DDLParserSuite extends AnalysisTest {
             ctas.tableSpec.external)
         case rtas: ReplaceTableAsSelect =>
           TableSpec(
-            rtas.name.asInstanceOf[UnresolvedDBObjectName].nameParts,
+            rtas.name.asInstanceOf[UnresolvedIdentifier].nameParts,
             Some(rtas.query).filter(_.resolved).map(_.schema),
             rtas.partitioning,
             rtas.tableSpec.properties,
@@ -2224,12 +2224,12 @@ class DDLParserSuite extends AnalysisTest {
           .putString(ResolveDefaultColumns.EXISTS_DEFAULT_COLUMN_METADATA_KEY, "\"abc\"").build())
     comparePlans(parsePlan(
       "CREATE TABLE my_tab(a INT, b STRING NOT NULL DEFAULT \"abc\") USING parquet"),
-      CreateTable(UnresolvedDBObjectName(Seq("my_tab"), false), schemaWithDefaultColumn,
+      CreateTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
         Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
           Map.empty[String, String], None, None, None, false), false))
     comparePlans(parsePlan("REPLACE TABLE my_tab(a INT, " +
       "b STRING NOT NULL DEFAULT \"abc\") USING parquet"),
-      ReplaceTable(UnresolvedDBObjectName(Seq("my_tab"), false), schemaWithDefaultColumn,
+      ReplaceTable(UnresolvedIdentifier(Seq("my_tab")), schemaWithDefaultColumn,
         Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], Some("parquet"),
           Map.empty[String, String], None, None, None, false), false))
     // THese ALTER TABLE statements should parse successfully.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index e5adab866d9..7e11b8b6d36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Stable
 import org.apache.spark.sql.catalyst.{CatalystIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedDBObjectName, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedIdentifier, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, InsertIntoStatement, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
@@ -336,10 +336,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
                 external = false)
               runCommand(df.sparkSession) {
                 CreateTableAsSelect(
-                  UnresolvedDBObjectName(
-                    catalog.name +: ident.namespace.toSeq :+ ident.name,
-                    isNamespace = false
-                  ),
+                  UnresolvedIdentifier(catalog.name +: ident.namespace.toSeq :+ ident.name),
                   partitioningAsV2,
                   df.queryExecution.analyzed,
                   tableSpec,
@@ -603,7 +600,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
           serde = None,
           external = false)
         ReplaceTableAsSelect(
-          UnresolvedDBObjectName(nameParts, isNamespace = false),
+          UnresolvedIdentifier(nameParts),
           partitioningAsV2,
           df.queryExecution.analyzed,
           tableSpec,
@@ -624,7 +621,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
           external = false)
 
         CreateTableAsSelect(
-          UnresolvedDBObjectName(nameParts, isNamespace = false),
+          UnresolvedIdentifier(nameParts),
           partitioningAsV2,
           df.queryExecution.analyzed,
           tableSpec,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
index 93127e6288a..270bf0a948e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedDBObjectName, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException, UnresolvedIdentifier, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Bucket, Days, Hours, Literal, Months, Years}
 import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic, ReplaceTableAsSelect, TableSpec}
 import org.apache.spark.sql.connector.expressions.{LogicalExpressions, NamedReference, Transform}
@@ -117,7 +117,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
       external = false)
     runCommand(
       CreateTableAsSelect(
-        UnresolvedDBObjectName(tableName, isNamespace = false),
+        UnresolvedIdentifier(tableName),
         partitioning.getOrElse(Seq.empty),
         logicalPlan,
         tableSpec,
@@ -205,7 +205,7 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T])
       serde = None,
       external = false)
     runCommand(ReplaceTableAsSelect(
-      UnresolvedDBObjectName(tableName, isNamespace = false),
+      UnresolvedIdentifier(tableName),
       partitioning.getOrElse(Seq.empty),
       logicalPlan,
       tableSpec,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 3e39863f5bb..a495e35bf2c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -151,19 +151,19 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
     // For CREATE TABLE [AS SELECT], we should use the v1 command if the catalog is resolved to the
     // session catalog and the table provider is not v2.
-    case c @ CreateTable(ResolvedDBObjectName(catalog, name), _, _, _, _)
+    case c @ CreateTable(ResolvedIdentifier(catalog, ident), _, _, _, _)
         if isSessionCatalog(catalog) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
         c.tableSpec.provider, c.tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
         ctas = false)
       if (!isV2Provider(provider)) {
-        constructV1TableCmd(None, c.tableSpec, name, c.tableSchema, c.partitioning,
+        constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema, c.partitioning,
           c.ignoreIfExists, storageFormat, provider)
       } else {
         c
       }
 
-    case c @ CreateTableAsSelect(ResolvedDBObjectName(catalog, name), _, _, _, writeOptions, _)
+    case c @ CreateTableAsSelect(ResolvedIdentifier(catalog, ident), _, _, _, writeOptions, _)
         if isSessionCatalog(catalog) =>
       val (storageFormat, provider) = getStorageFormatAndProvider(
         c.tableSpec.provider,
@@ -173,7 +173,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         ctas = true)
 
       if (!isV2Provider(provider)) {
-        constructV1TableCmd(Some(c.query), c.tableSpec, name, new StructType, c.partitioning,
+        constructV1TableCmd(Some(c.query), c.tableSpec, ident, new StructType, c.partitioning,
           c.ignoreIfExists, storageFormat, provider)
       } else {
         c
@@ -187,7 +187,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
     // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
     // session catalog and the table provider is not v2.
-    case c @ ReplaceTable(ResolvedDBObjectName(catalog, _), _, _, _, _)
+    case c @ ReplaceTable(ResolvedIdentifier(catalog, _), _, _, _, _)
         if isSessionCatalog(catalog) =>
       val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
@@ -196,7 +196,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         c
       }
 
-    case c @ ReplaceTableAsSelect(ResolvedDBObjectName(catalog, _), _, _, _, _, _)
+    case c @ ReplaceTableAsSelect(ResolvedIdentifier(catalog, _), _, _, _, _, _)
         if isSessionCatalog(catalog) =>
       val provider = c.tableSpec.provider.getOrElse(conf.defaultDataSourceName)
       if (!isV2Provider(provider)) {
@@ -358,11 +358,11 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         originalText,
         query)
 
-    case CreateView(ResolvedDBObjectName(catalog, nameParts), userSpecifiedColumns, comment,
+    case CreateView(ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment,
         properties, originalText, child, allowExisting, replace) =>
       if (isSessionCatalog(catalog)) {
         CreateViewCommand(
-          name = attachSessionCatalog(nameParts.asTableIdentifier),
+          name = attachSessionCatalog(ident.asTableIdentifier),
           userSpecifiedColumns = userSpecifiedColumns,
           comment = comment,
           properties = properties,
@@ -419,17 +419,17 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION")
       }
 
-    case CreateFunction(ResolvedDBObjectName(catalog, nameParts),
+    case CreateFunction(ResolvedIdentifier(catalog, ident),
         className, resources, ignoreIfExists, replace) =>
       if (isSessionCatalog(catalog)) {
-        val database = if (nameParts.length > 2) {
-          throw QueryCompilationErrors.requiresSinglePartNamespaceError(nameParts)
-        } else if (nameParts.length == 2) {
-          Some(nameParts.head)
+        val database = if (ident.namespace().length > 1) {
+          throw QueryCompilationErrors.requiresSinglePartNamespaceError(ident.asMultipartIdentifier)
+        } else if (ident.namespace().length == 1) {
+          Some(ident.namespace().head)
         } else {
           None
         }
-        val identifier = FunctionIdentifier(nameParts.last, database)
+        val identifier = FunctionIdentifier(ident.name(), database)
         CreateFunctionCommand(
           identifier,
           className,
@@ -445,13 +445,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
   private def constructV1TableCmd(
       query: Option[LogicalPlan],
       tableSpec: TableSpec,
-      name: Seq[String],
+      ident: Identifier,
       tableSchema: StructType,
       partitioning: Seq[Transform],
       ignoreIfExists: Boolean,
       storageFormat: CatalogStorageFormat,
       provider: String): CreateTableV1 = {
-    val tableDesc = buildCatalogTable(attachSessionCatalog(name.asTableIdentifier), tableSchema,
+    val tableDesc = buildCatalogTable(attachSessionCatalog(ident.asTableIdentifier), tableSchema,
         partitioning, tableSpec.properties, provider,
         tableSpec.location, tableSpec.comment, storageFormat, tableSpec.external)
     val mode = if (ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
@@ -628,12 +628,12 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
   }
 
   private object DatabaseNameInSessionCatalog {
-    def unapply(resolved: ResolvedDBObjectName): Option[String] = resolved match {
-      case ResolvedDBObjectName(catalog, _) if !isSessionCatalog(catalog) => None
-      case ResolvedDBObjectName(_, Seq(dbName)) => Some(dbName)
+    def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
+      case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None
+      case ResolvedNamespace(_, Seq(dbName)) => Some(dbName)
       case _ =>
-        assert(resolved.nameParts.length > 1)
-        throw QueryCompilationErrors.invalidDatabaseNameError(resolved.nameParts.quoted)
+        assert(resolved.namespace.length > 1)
+        throw QueryCompilationErrors.invalidDatabaseNameError(resolved.namespace.quoted)
     }
   }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index a8f7b88625b..a82fc47f427 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -27,7 +27,7 @@ import org.antlr.v4.runtime.{ParserRuleContext, Token}
 import org.antlr.v4.runtime.tree.TerminalNode
 
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedDBObjectName, UnresolvedFunc}
+import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, PersistedView, UnresolvedFunc, UnresolvedIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.parser._
@@ -485,9 +485,7 @@ class SparkSqlAstBuilder extends AstBuilder {
       assert(Option(originalText).isDefined,
         "'originalText' must be provided to create permanent view")
       CreateView(
-        UnresolvedDBObjectName(
-          visitMultipartIdentifier(ctx.multipartIdentifier),
-          false),
+        UnresolvedIdentifier(visitMultipartIdentifier(ctx.multipartIdentifier)),
         userSpecifiedColumns,
         visitCommentSpecList(ctx.commentSpec()),
         properties,
@@ -549,9 +547,7 @@ class SparkSqlAstBuilder extends AstBuilder {
     val functionIdentifier = visitMultipartIdentifier(ctx.multipartIdentifier)
     if (ctx.TEMPORARY == null) {
       CreateFunction(
-        UnresolvedDBObjectName(
-          functionIdentifier,
-          isNamespace = false),
+        UnresolvedIdentifier(functionIdentifier),
         string(ctx.className),
         resources.toSeq,
         ctx.EXISTS != null,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 93445eea51b..16c6b331d10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.sql.{SparkSession, Strategy}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedDBObjectName, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedPartitionSpec, ResolvedTable}
 import org.apache.spark.sql.catalyst.catalog.CatalogUtils
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruning, Expression, NamedExpression, Not, Or, PredicateHelper, SubqueryExpression}
@@ -169,50 +169,50 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       }
       WriteToDataSourceV2Exec(writer, invalidateCacheFunc, planLater(query), customMetrics) :: Nil
 
-    case CreateTable(ResolvedDBObjectName(catalog, ident), schema, partitioning,
+    case CreateTable(ResolvedIdentifier(catalog, ident), schema, partitioning,
         tableSpec, ifNotExists) =>
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
           schema, tableSpec.provider, "CREATE TABLE")
-      CreateTableExec(catalog.asTableCatalog, ident.asIdentifier, newSchema,
+      CreateTableExec(catalog.asTableCatalog, ident, newSchema,
         partitioning, qualifyLocInTableSpec(tableSpec), ifNotExists) :: Nil
 
-    case CreateTableAsSelect(ResolvedDBObjectName(catalog, ident), parts, query, tableSpec,
+    case CreateTableAsSelect(ResolvedIdentifier(catalog, ident), parts, query, tableSpec,
         options, ifNotExists) =>
       val writeOptions = new CaseInsensitiveStringMap(options.asJava)
       catalog match {
         case staging: StagingTableCatalog =>
-          AtomicCreateTableAsSelectExec(staging, ident.asIdentifier, parts, query, planLater(query),
+          AtomicCreateTableAsSelectExec(staging, ident, parts, query, planLater(query),
             qualifyLocInTableSpec(tableSpec), writeOptions, ifNotExists) :: Nil
         case _ =>
-          CreateTableAsSelectExec(catalog.asTableCatalog, ident.asIdentifier, parts, query,
+          CreateTableAsSelectExec(catalog.asTableCatalog, ident, parts, query,
             planLater(query), qualifyLocInTableSpec(tableSpec), writeOptions, ifNotExists) :: Nil
       }
 
     case RefreshTable(r: ResolvedTable) =>
       RefreshTableExec(r.catalog, r.identifier, recacheTable(r)) :: Nil
 
-    case ReplaceTable(ResolvedDBObjectName(catalog, ident), schema, parts, tableSpec, orCreate) =>
+    case ReplaceTable(ResolvedIdentifier(catalog, ident), schema, parts, tableSpec, orCreate) =>
       val newSchema: StructType =
         ResolveDefaultColumns.constantFoldCurrentDefaultsToExistDefaults(
           schema, tableSpec.provider, "CREATE TABLE")
       catalog match {
         case staging: StagingTableCatalog =>
-          AtomicReplaceTableExec(staging, ident.asIdentifier, newSchema, parts,
+          AtomicReplaceTableExec(staging, ident, newSchema, parts,
             qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil
         case _ =>
-          ReplaceTableExec(catalog.asTableCatalog, ident.asIdentifier, newSchema, parts,
+          ReplaceTableExec(catalog.asTableCatalog, ident, newSchema, parts,
             qualifyLocInTableSpec(tableSpec), orCreate = orCreate, invalidateCache) :: Nil
       }
 
-    case ReplaceTableAsSelect(ResolvedDBObjectName(catalog, ident),
+    case ReplaceTableAsSelect(ResolvedIdentifier(catalog, ident),
         parts, query, tableSpec, options, orCreate) =>
       val writeOptions = new CaseInsensitiveStringMap(options.asJava)
       catalog match {
         case staging: StagingTableCatalog =>
           AtomicReplaceTableAsSelectExec(
             staging,
-            ident.asIdentifier,
+            ident,
             parts,
             query,
             planLater(query),
@@ -223,7 +223,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
         case _ =>
           ReplaceTableAsSelectExec(
             catalog.asTableCatalog,
-            ident.asIdentifier,
+            ident,
             parts,
             query,
             planLater(query),
@@ -352,11 +352,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
         ns,
         Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil
 
-    case CreateNamespace(ResolvedDBObjectName(catalog, name), ifNotExists, properties) =>
+    case CreateNamespace(ResolvedNamespace(catalog, ns), ifNotExists, properties) =>
       val finalProperties = properties.get(SupportsNamespaces.PROP_LOCATION).map { loc =>
         properties + (SupportsNamespaces.PROP_LOCATION -> makeQualifiedDBObjectPath(loc))
       }.getOrElse(properties)
-      CreateNamespaceExec(catalog.asNamespaceCatalog, name, ifNotExists, finalProperties) :: Nil
+      CreateNamespaceExec(catalog.asNamespaceCatalog, ns, ifNotExists, finalProperties) :: Nil
 
     case DropNamespace(ResolvedNamespace(catalog, ns), ifExists, cascade) =>
       DropNamespaceExec(catalog, ns, ifExists, cascade) :: Nil
@@ -367,7 +367,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
     case ShowTables(ResolvedNamespace(catalog, ns), pattern, output) =>
       ShowTablesExec(output, catalog.asTableCatalog, ns, pattern) :: Nil
 
-    case SetCatalogAndNamespace(ResolvedDBObjectName(catalog, ns)) =>
+    case SetCatalogAndNamespace(ResolvedNamespace(catalog, ns)) =>
       val catalogManager = session.sessionState.catalogManager
       val namespace = if (ns.nonEmpty) Some(ns) else None
       SetCatalogAndNamespaceExec(catalogManager, Some(catalog.name()), namespace) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 880c084ab6f..8ca11f620a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -23,7 +23,7 @@ import scala.util.control.NonFatal
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
 import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedNonPersistentFunc, ResolvedPersistentFunc, ResolvedTable, ResolvedView, UnresolvedDBObjectName, UnresolvedFunc, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedNonPersistentFunc, ResolvedPersistentFunc, ResolvedTable, ResolvedView, UnresolvedFunc, UnresolvedIdentifier, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, SubqueryAlias, TableSpec, View}
@@ -657,7 +657,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
       external = tableType == CatalogTableType.EXTERNAL)
 
     val plan = CreateTable(
-      name = UnresolvedDBObjectName(ident, isNamespace = false),
+      name = UnresolvedIdentifier(ident),
       tableSchema = schema,
       partitioning = Seq(),
       tableSpec = tableSpec,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 0084ca53a26..f1d2e351ebd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.api.java.function.VoidFunction2
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedDBObjectName
+import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, TableSpec}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes
@@ -298,9 +298,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
         None,
         false)
       val cmd = CreateTable(
-        UnresolvedDBObjectName(
-          originalMultipartIdentifier,
-          isNamespace = false),
+        UnresolvedIdentifier(originalMultipartIdentifier),
         df.schema.asNullable,
         partitioningColumns.getOrElse(Nil).asTransforms.toSeq,
         tableSpec,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index b6a8480814e..08de29be27e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.connector
 
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedDBObjectName, UnresolvedFieldName, UnresolvedFieldPosition}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedIdentifier}
 import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, TableSpec}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.connector.catalog.Identifier
@@ -49,7 +49,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
           val tableSpec = TableSpec(Map.empty, None, Map.empty,
             None, None, None, false)
           val plan = CreateTableAsSelect(
-            UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+            UnresolvedIdentifier(Array("table_name")),
             Expressions.identity(ref) :: Nil,
             TestRelation2,
             tableSpec,
@@ -73,7 +73,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
           val tableSpec = TableSpec(Map.empty, None, Map.empty,
             None, None, None, false)
           val plan = CreateTableAsSelect(
-            UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+            UnresolvedIdentifier(Array("table_name")),
             Expressions.bucket(4, ref) :: Nil,
             TestRelation2,
             tableSpec,
@@ -98,7 +98,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
           val tableSpec = TableSpec(Map.empty, None, Map.empty,
             None, None, None, false)
           val plan = ReplaceTableAsSelect(
-            UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+            UnresolvedIdentifier(Array("table_name")),
             Expressions.identity(ref) :: Nil,
             TestRelation2,
             tableSpec,
@@ -122,7 +122,7 @@ class V2CommandsCaseSensitivitySuite extends SharedSparkSession with AnalysisTes
           val tableSpec = TableSpec(Map.empty, None, Map.empty,
             None, None, None, false)
           val plan = ReplaceTableAsSelect(
-            UnresolvedDBObjectName(Array("table_name"), isNamespace = false),
+            UnresolvedIdentifier(Array("table_name")),
             Expressions.bucket(4, ref) :: Nil,
             TestRelation2,
             tableSpec,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateNamespaceParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateNamespaceParserSuite.scala
index 6c59512148a..6a2a8647961 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateNamespaceParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/CreateNamespaceParserSuite.scala
@@ -17,14 +17,14 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedDBObjectName}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedNamespace}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
 import org.apache.spark.sql.catalyst.plans.logical.CreateNamespace
 
 class CreateNamespaceParserSuite extends AnalysisTest {
   test("create namespace -- backward compatibility with DATABASE/DBPROPERTIES") {
     val expected = CreateNamespace(
-      UnresolvedDBObjectName(Seq("a", "b", "c"), true),
+      UnresolvedNamespace(Seq("a", "b", "c")),
       ifNotExists = true,
       Map(
         "a" -> "a",
@@ -98,7 +98,7 @@ class CreateNamespaceParserSuite extends AnalysisTest {
     comparePlans(
       parsePlan(sql),
       CreateNamespace(
-        UnresolvedDBObjectName(Seq("a", "b", "c"), true),
+        UnresolvedNamespace(Seq("a", "b", "c")),
         ifNotExists = false,
         Map(
           "a" -> "1",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
index 05378e32296..49059fe9ef4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command
 import java.util.Locale
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, UnresolvedAttribute, UnresolvedDBObjectName, UnresolvedFunc}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, GlobalTempView, LocalTempView, UnresolvedAttribute, UnresolvedFunc, UnresolvedIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{ArchiveResource, FileResource, FunctionResource, JarResource}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans
@@ -314,7 +314,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
     val parsed1 = parser.parsePlan(v1)
 
     val expected1 = CreateView(
-      UnresolvedDBObjectName(Seq("view1"), false),
+      UnresolvedIdentifier(Seq("view1")),
       Seq.empty[(String, Option[String])],
       None,
       Map.empty[String, String],
@@ -354,7 +354,7 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
       """.stripMargin
     val parsed1 = parser.parsePlan(v1)
     val expected1 = CreateView(
-      UnresolvedDBObjectName(Seq("view1"), false),
+      UnresolvedIdentifier(Seq("view1")),
       Seq("col1" -> None, "col3" -> Some("hello")),
       Some("BLABLA"),
       Map("prop1Key" -> "prop1Val"),
@@ -410,35 +410,35 @@ class DDLParserSuite extends AnalysisTest with SharedSparkSession {
 
   test("CREATE FUNCTION") {
     comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun'"),
-      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun", Seq(), false, false))
+      CreateFunction(UnresolvedIdentifier(Seq("a")), "fun", Seq(), false, false))
 
     comparePlans(parser.parsePlan("CREATE FUNCTION a.b.c as 'fun'"),
-      CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), false, false))
+      CreateFunction(UnresolvedIdentifier(Seq("a", "b", "c")), "fun", Seq(), false, false))
 
     comparePlans(parser.parsePlan("CREATE OR REPLACE FUNCTION a.b.c as 'fun'"),
-      CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), false, true))
+      CreateFunction(UnresolvedIdentifier(Seq("a", "b", "c")), "fun", Seq(), false, true))
 
     comparePlans(parser.parsePlan("CREATE TEMPORARY FUNCTION a as 'fun'"),
       CreateFunctionCommand(Seq("a").asFunctionIdentifier, "fun", Seq(), true, false, false))
 
     comparePlans(parser.parsePlan("CREATE FUNCTION IF NOT EXISTS a.b.c as 'fun'"),
-      CreateFunction(UnresolvedDBObjectName(Seq("a", "b", "c"), false), "fun", Seq(), true, false))
+      CreateFunction(UnresolvedIdentifier(Seq("a", "b", "c")), "fun", Seq(), true, false))
 
     comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun' USING JAR 'j'"),
-      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+      CreateFunction(UnresolvedIdentifier(Seq("a")), "fun",
         Seq(FunctionResource(JarResource, "j")), false, false))
 
     comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun' USING ARCHIVE 'a'"),
-      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+      CreateFunction(UnresolvedIdentifier(Seq("a")), "fun",
         Seq(FunctionResource(ArchiveResource, "a")), false, false))
 
     comparePlans(parser.parsePlan("CREATE FUNCTION a as 'fun' USING FILE 'f'"),
-      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+      CreateFunction(UnresolvedIdentifier(Seq("a")), "fun",
         Seq(FunctionResource(FileResource, "f")), false, false))
 
     comparePlans(
       parser.parsePlan("CREATE FUNCTION a as 'fun' USING JAR 'j', ARCHIVE 'a', FILE 'f'"),
-      CreateFunction(UnresolvedDBObjectName(Seq("a"), false), "fun",
+      CreateFunction(UnresolvedIdentifier(Seq("a")), "fun",
         Seq(FunctionResource(JarResource, "j"),
           FunctionResource(ArchiveResource, "a"), FunctionResource(FileResource, "f")),
         false, false))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index ef169f3a370..610fcd54b2a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -26,7 +26,7 @@ import org.mockito.invocation.InvocationOnMock
 
 import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedDBObjectName, ResolvedFieldName, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedInlineTable, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
+import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, AnalysisTest, Analyzer, EmptyFunctionRegistry, NoSuchTableException, ResolvedFieldName, ResolvedIdentifier, ResolvedTable, ResolveSessionCatalog, UnresolvedAttribute, UnresolvedInlineTable, UnresolvedRelation, UnresolvedSubqueryColumnAliases, UnresolvedTable}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral}
 import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
@@ -536,8 +536,8 @@ class PlanResolutionSuite extends AnalysisTest {
 
     parseAndResolve(sql) match {
       case create: CreateTable =>
-        assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat")
-        assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") ==
+        assert(create.name.asInstanceOf[ResolvedIdentifier].catalog.name == "testcat")
+        assert(create.name.asInstanceOf[ResolvedIdentifier].identifier.toString ==
           "mydb.table_name")
         assert(create.tableSchema == new StructType()
             .add("id", LongType)
@@ -567,8 +567,8 @@ class PlanResolutionSuite extends AnalysisTest {
 
     parseAndResolve(sql, withDefault = true) match {
       case create: CreateTable =>
-        assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat")
-        assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") ==
+        assert(create.name.asInstanceOf[ResolvedIdentifier].catalog.name == "testcat")
+        assert(create.name.asInstanceOf[ResolvedIdentifier].identifier.toString ==
           "mydb.table_name")
         assert(create.tableSchema == new StructType()
             .add("id", LongType)
@@ -598,9 +598,9 @@ class PlanResolutionSuite extends AnalysisTest {
 
     parseAndResolve(sql) match {
       case create: CreateTable =>
-        assert(create.name.asInstanceOf[ResolvedDBObjectName].catalog.name ==
+        assert(create.name.asInstanceOf[ResolvedIdentifier].catalog.name ==
           CatalogManager.SESSION_CATALOG_NAME)
-        assert(create.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") ==
+        assert(create.name.asInstanceOf[ResolvedIdentifier].identifier.toString ==
           "mydb.page_view")
         assert(create.tableSchema == new StructType()
             .add("id", LongType)
@@ -628,9 +628,9 @@ class PlanResolutionSuite extends AnalysisTest {
 
     parseAndResolve(sql) match {
       case ctas: CreateTableAsSelect =>
-        assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat")
+        assert(ctas.name.asInstanceOf[ResolvedIdentifier].catalog.name == "testcat")
         assert(
-          ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name"
+          ctas.name.asInstanceOf[ResolvedIdentifier].identifier.toString == "mydb.table_name"
         )
         assert(ctas.writeOptions.isEmpty)
         assert(ctas.partitioning.isEmpty)
@@ -655,9 +655,9 @@ class PlanResolutionSuite extends AnalysisTest {
 
     parseAndResolve(sql, withDefault = true) match {
       case ctas: CreateTableAsSelect =>
-        assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name == "testcat")
+        assert(ctas.name.asInstanceOf[ResolvedIdentifier].catalog.name == "testcat")
         assert(
-          ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") == "mydb.table_name"
+          ctas.name.asInstanceOf[ResolvedIdentifier].identifier.toString == "mydb.table_name"
         )
         assert(ctas.writeOptions.isEmpty)
         assert(ctas.partitioning.isEmpty)
@@ -682,9 +682,9 @@ class PlanResolutionSuite extends AnalysisTest {
 
     parseAndResolve(sql) match {
       case ctas: CreateTableAsSelect =>
-        assert(ctas.name.asInstanceOf[ResolvedDBObjectName].catalog.name ==
+        assert(ctas.name.asInstanceOf[ResolvedIdentifier].catalog.name ==
           CatalogManager.SESSION_CATALOG_NAME)
-        assert(ctas.name.asInstanceOf[ResolvedDBObjectName].nameParts.mkString(".") ==
+        assert(ctas.name.asInstanceOf[ResolvedIdentifier].identifier.toString ==
           "mydb.page_view")
         assert(ctas.writeOptions.isEmpty)
         assert(ctas.partitioning.isEmpty)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org