You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/11 05:17:29 UTC

[spark] branch master updated: [SPARK-40020][SQL] Centralize the code of qualifying identifiers in SessionCatalog

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new abaec2804fb [SPARK-40020][SQL] Centralize the code of qualifying identifiers in SessionCatalog
abaec2804fb is described below

commit abaec2804fb48d2d378f7c3e99733af97283fa22
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Aug 11 13:16:48 2022 +0800

    [SPARK-40020][SQL] Centralize the code of qualifying identifiers in SessionCatalog
    
    ### What changes were proposed in this pull request?
    
    This PR is a refactor of `SessionCatalog`. It centralizes the code of qualifying identifiers in one place: `SessionCatalog.qualifyIdentifier`. Then we can call `qualifyIdentifier` in places (mostly in `SessionCatalog`) where we get the current database and format names manually before.
    
    Attaching the v1 catalog name is also kind of qualifying identifiers, and is included in `SessionCatalog.qualifyIdentifier` as well in this PR. This simplifies the code of https://github.com/apache/spark/pull/37021 : now we only need to attach the v1 catalog name in one place, instead of spreading out many places.
    
    This also improves the v1 command resolution: now `ResolveSessionCatalog` uses qualified identifiers when falling back to v1 command, while before we only attach the v1 catalog name. This avoids potential issues that the v1 command execution may qualify the identifiers differently (e.g. current database changes).
    
    ### Why are the changes needed?
    
    code cleanup
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #37415 from cloud-fan/follow.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   4 +-
 .../sql/catalyst/catalog/SessionCatalog.scala      | 424 +++++++++++----------
 .../apache/spark/sql/catalyst/identifiers.scala    |  27 +-
 .../sql/connector/catalog/CatalogV2Implicits.scala |   7 +-
 .../sql/connector/catalog/LookupCatalog.scala      |   5 +-
 .../catalyst/catalog/ExternalCatalogSuite.scala    |  13 +-
 .../sql/catalyst/catalog/SessionCatalogSuite.scala |  39 +-
 .../sql/connector/catalog/LookupCatalogSuite.scala |   4 +-
 .../org/apache/spark/sql/DataFrameWriter.scala     |  22 +-
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 150 ++++----
 .../spark/sql/execution/command/functions.scala    |  10 +-
 .../spark/sql/execution/command/tables.scala       |   3 +-
 .../apache/spark/sql/execution/command/views.scala |  20 +-
 .../sql-tests/results/show-create-table.sql.out    |  12 +-
 .../test/resources/sql-tests/results/udaf.sql.out  |   2 +-
 .../sql-tests/results/udf/udf-udaf.sql.out         |   2 +-
 .../apache/spark/sql/execution/SQLViewSuite.scala  |   2 +-
 .../spark/sql/execution/command/DDLSuite.scala     |  11 +-
 .../execution/command/PlanResolutionSuite.scala    |  38 +-
 .../execution/command/v1/ShowFunctionsSuite.scala  |   6 +
 .../spark/sql/hive/client/HiveClientImpl.scala     |   5 +-
 .../apache/spark/sql/hive/client/HiveShim.scala    |   5 +-
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala |  15 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala    |   4 +-
 .../execution/command/ShowFunctionsSuite.scala     |   6 -
 .../spark/sql/sources/HadoopFsRelationTest.scala   |   3 +-
 26 files changed, 425 insertions(+), 414 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a6108c2a3d3..aa177bcbcc8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1142,7 +1142,9 @@ class Analyzer(override val catalogManager: CatalogManager)
             CatalogV2Util.loadTable(catalog, ident).map {
               case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) &&
                 v1Table.v1Table.tableType == CatalogTableType.VIEW =>
-                ResolvedView(ident, isTemp = false)
+                val v1Ident = v1Table.catalogTable.identifier
+                val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier)
+                ResolvedView(v2Ident, isTemp = false)
               case table =>
                 ResolvedTable.create(catalog.asTableCatalog, ident, table)
             }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index a0c98aac6c4..77da74ebc94 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.CatalystIdentifier._
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionInfo, UpCast}
@@ -131,7 +130,7 @@ class SessionCatalog(
   // check whether the temporary view or function exists, then, if not, operate on
   // the corresponding item in the current database.
   @GuardedBy("this")
-  protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)
+  protected var currentDb: String = format(DEFAULT_DATABASE)
 
   private val validNameFormat = "([\\w_]+)".r
 
@@ -149,17 +148,48 @@ class SessionCatalog(
   }
 
   /**
-   * Format table name, taking into account case sensitivity.
+   * Formats object names, taking into account case sensitivity.
    */
-  protected[this] def formatTableName(name: String): String = {
+  protected def format(name: String): String = {
     if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
   }
 
   /**
-   * Format database name, taking into account case sensitivity.
+   * Qualifies the table identifier with the current database if not specified, and normalize all
+   * the names.
    */
-  protected[this] def formatDatabaseName(name: String): String = {
-    if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
+  def qualifyIdentifier(ident: TableIdentifier): TableIdentifier = {
+    TableIdentifier(
+      table = format(ident.table),
+      database = getDatabase(ident),
+      catalog = getCatalog(ident))
+  }
+
+  /**
+   * Qualifies the function identifier with the current database if not specified, and normalize all
+   * the names.
+   */
+  def qualifyIdentifier(ident: FunctionIdentifier): FunctionIdentifier = {
+    FunctionIdentifier(
+      funcName = format(ident.funcName),
+      database = getDatabase(ident),
+      catalog = getCatalog(ident))
+  }
+
+  private def attachCatalogName(table: CatalogTable): CatalogTable = {
+    table.copy(identifier = table.identifier.copy(catalog = getCatalog(table.identifier)))
+  }
+
+  private def getDatabase(ident: CatalystIdentifier): Option[String] = {
+    Some(format(ident.database.getOrElse(getCurrentDatabase)))
+  }
+
+  private def getCatalog(ident: CatalystIdentifier): Option[String] = {
+    if (conf.getConf(SQLConf.LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME)) {
+      ident.catalog
+    } else {
+      Some(format(ident.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME)))
+    }
   }
 
   private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
@@ -195,9 +225,8 @@ class SessionCatalog(
 
   /** This method discards any cached table relation plans for the given table identifier. */
   def invalidateCachedTable(name: TableIdentifier): Unit = {
-    val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
-    val tableName = formatTableName(name.table)
-    invalidateCachedTable(QualifiedTableName(dbName, tableName))
+    val qualified = qualifyIdentifier(name)
+    invalidateCachedTable(QualifiedTableName(qualified.database.get, qualified.table))
   }
 
   /** This method provides a way to invalidate all the cached plans. */
@@ -223,15 +252,13 @@ class SessionCatalog(
 
   private def requireTableExists(name: TableIdentifier): Unit = {
     if (!tableExists(name)) {
-      val db = name.database.getOrElse(currentDb)
-      throw new NoSuchTableException(db = db, table = name.table)
+      throw new NoSuchTableException(db = name.database.get, table = name.table)
     }
   }
 
   private def requireTableNotExists(name: TableIdentifier): Unit = {
     if (tableExists(name)) {
-      val db = name.database.getOrElse(currentDb)
-      throw new TableAlreadyExistsException(db = db, table = name.table)
+      throw new TableAlreadyExistsException(db = name.database.get, table = name.table)
     }
   }
 
@@ -242,7 +269,7 @@ class SessionCatalog(
   // ----------------------------------------------------------------------------
 
   def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
-    val dbName = formatDatabaseName(dbDefinition.name)
+    val dbName = format(dbDefinition.name)
     if (dbName == globalTempViewManager.database) {
       throw QueryCompilationErrors.cannotCreateDatabaseWithSameNameAsPreservedDatabaseError(
         globalTempViewManager.database)
@@ -258,7 +285,7 @@ class SessionCatalog(
   }
 
   def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
-    val dbName = formatDatabaseName(db)
+    val dbName = format(db)
     if (dbName == DEFAULT_DATABASE) {
       throw QueryCompilationErrors.cannotDropDefaultDatabaseError
     }
@@ -274,20 +301,20 @@ class SessionCatalog(
   }
 
   def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
-    val dbName = formatDatabaseName(dbDefinition.name)
+    val dbName = format(dbDefinition.name)
     requireDbExists(dbName)
     externalCatalog.alterDatabase(dbDefinition.copy(
       name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri)))
   }
 
   def getDatabaseMetadata(db: String): CatalogDatabase = {
-    val dbName = formatDatabaseName(db)
+    val dbName = format(db)
     requireDbExists(dbName)
     externalCatalog.getDatabase(dbName)
   }
 
   def databaseExists(db: String): Boolean = {
-    val dbName = formatDatabaseName(db)
+    val dbName = format(db)
     externalCatalog.databaseExists(dbName)
   }
 
@@ -302,7 +329,7 @@ class SessionCatalog(
   def getCurrentDatabase: String = synchronized { currentDb }
 
   def setCurrentDatabase(db: String): Unit = {
-    val dbName = formatDatabaseName(db)
+    val dbName = format(db)
     if (dbName == globalTempViewManager.database) {
       throw QueryCompilationErrors.cannotUsePreservedDatabaseAsCurrentDatabaseError(
         globalTempViewManager.database)
@@ -316,7 +343,7 @@ class SessionCatalog(
    * by users.
    */
   def getDefaultDBPath(db: String): URI = {
-    CatalogUtils.stringToURI(formatDatabaseName(db) + ".db")
+    CatalogUtils.stringToURI(format(db) + ".db")
   }
 
   // ----------------------------------------------------------------------------
@@ -345,9 +372,9 @@ class SessionCatalog(
       throw QueryCompilationErrors.createExternalTableWithoutLocationError
     }
 
-    val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(tableDefinition.identifier.table)
-    val tableIdentifier = attachSessionCatalog(TableIdentifier(table, Some(db)))
+    val qualifiedIdent = qualifyIdentifier(tableDefinition.identifier)
+    val db = qualifiedIdent.database.get
+    val table = qualifiedIdent.table
     validateName(table)
 
     val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
@@ -357,9 +384,9 @@ class SessionCatalog(
         makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)
       tableDefinition.copy(
         storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
-        identifier = tableIdentifier)
+        identifier = qualifiedIdent)
     } else {
-      tableDefinition.copy(identifier = tableIdentifier)
+      tableDefinition.copy(identifier = qualifiedIdent)
     }
 
     requireDbExists(db)
@@ -395,7 +422,7 @@ class SessionCatalog(
     } else if (new Path(locationUri).isAbsolute) {
       makeQualifiedPath(locationUri)
     } else {
-      val dbName = formatDatabaseName(database)
+      val dbName = format(database)
       val dbLocation = makeQualifiedDBPath(getDatabaseMetadata(dbName).locationUri)
       new Path(new Path(dbLocation), CatalogUtils.URIToString(locationUri)).toUri
     }
@@ -411,11 +438,10 @@ class SessionCatalog(
    * this becomes a no-op.
    */
   def alterTable(tableDefinition: CatalogTable): Unit = {
-    val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(tableDefinition.identifier.table)
-    val tableIdentifier = attachSessionCatalog(TableIdentifier(table, Some(db)))
+    val qualifiedIdent = qualifyIdentifier(tableDefinition.identifier)
+    val db = qualifiedIdent.database.get
     requireDbExists(db)
-    requireTableExists(tableIdentifier)
+    requireTableExists(qualifiedIdent)
     val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
       && !tableDefinition.storage.locationUri.get.isAbsolute) {
       // make the location of the table qualified.
@@ -423,9 +449,9 @@ class SessionCatalog(
         makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)
       tableDefinition.copy(
         storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
-        identifier = tableIdentifier)
+        identifier = qualifiedIdent)
     } else {
-      tableDefinition.copy(identifier = tableIdentifier)
+      tableDefinition.copy(identifier = qualifiedIdent)
     }
 
     externalCatalog.alterTable(newTableDefinition)
@@ -442,11 +468,11 @@ class SessionCatalog(
   def alterTableDataSchema(
       identifier: TableIdentifier,
       newDataSchema: StructType): Unit = {
-    val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(identifier.table)
-    val tableIdentifier = attachSessionCatalog(TableIdentifier(table, Some(db)))
+    val qualifiedIdent = qualifyIdentifier(identifier)
+    val db = qualifiedIdent.database.get
+    val table = qualifiedIdent.table
     requireDbExists(db)
-    requireTableExists(tableIdentifier)
+    requireTableExists(qualifiedIdent)
 
     val catalogTable = externalCatalog.getTable(db, table)
     val oldDataSchema = catalogTable.dataSchema
@@ -469,14 +495,14 @@ class SessionCatalog(
    * identifier.
    */
   def alterTableStats(identifier: TableIdentifier, newStats: Option[CatalogStatistics]): Unit = {
-    val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(identifier.table)
-    val tableIdentifier = attachSessionCatalog(TableIdentifier(table, Some(db)))
+    val qualifiedIdent = qualifyIdentifier(identifier)
+    val db = qualifiedIdent.database.get
+    val table = qualifiedIdent.table
     requireDbExists(db)
-    requireTableExists(tableIdentifier)
+    requireTableExists(qualifiedIdent)
     externalCatalog.alterTableStats(db, table, newStats)
     // Invalidate the table relation cache
-    refreshTable(identifier)
+    refreshTable(qualifiedIdent)
   }
 
   /**
@@ -484,9 +510,8 @@ class SessionCatalog(
    * with current database.
    */
   def tableExists(name: TableIdentifier): Boolean = {
-    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(name.table)
-    externalCatalog.tableExists(db, table)
+    val qualifiedIdent = qualifyIdentifier(name)
+    externalCatalog.tableExists(qualifiedIdent.database.get, qualifiedIdent.table)
   }
 
   /**
@@ -509,11 +534,12 @@ class SessionCatalog(
   @throws[NoSuchDatabaseException]
   @throws[NoSuchTableException]
   def getTableRawMetadata(name: TableIdentifier): CatalogTable = {
-    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(name.table)
+    val qualifiedIdent = qualifyIdentifier(name)
+    val db = qualifiedIdent.database.get
+    val table = qualifiedIdent.table
     requireDbExists(db)
-    requireTableExists(TableIdentifier(table, Some(db)))
-    externalCatalog.getTable(db, table)
+    requireTableExists(qualifiedIdent)
+    attachCatalogName(externalCatalog.getTable(db, table))
   }
 
   /**
@@ -526,17 +552,17 @@ class SessionCatalog(
   @throws[NoSuchDatabaseException]
   def getTablesByName(names: Seq[TableIdentifier]): Seq[CatalogTable] = {
     if (names.nonEmpty) {
-      val dbs = names.map(_.database.getOrElse(getCurrentDatabase))
+      val qualifiedIdents = names.map(qualifyIdentifier)
+      val dbs = qualifiedIdents.map(_.database.get)
+      val tables = qualifiedIdents.map(_.table)
       if (dbs.distinct.size != 1) {
-        val tables = names.map(name => formatTableName(name.table))
         val qualifiedTableNames = dbs.zip(tables).map { case (d, t) => QualifiedTableName(d, t)}
         throw QueryCompilationErrors.cannotRetrieveTableOrViewNotInSameDatabaseError(
           qualifiedTableNames)
       }
-      val db = formatDatabaseName(dbs.head)
+      val db = dbs.head
       requireDbExists(db)
-      val tables = names.map(name => formatTableName(name.table))
-      externalCatalog.getTablesByName(db, tables)
+      externalCatalog.getTablesByName(db, tables).map(attachCatalogName)
     } else {
       Seq.empty
     }
@@ -552,10 +578,11 @@ class SessionCatalog(
       loadPath: String,
       isOverwrite: Boolean,
       isSrcLocal: Boolean): Unit = {
-    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(name.table)
+    val qualifiedIdent = qualifyIdentifier(name)
+    val db = qualifiedIdent.database.get
+    val table = qualifiedIdent.table
     requireDbExists(db)
-    requireTableExists(TableIdentifier(table, Some(db)))
+    requireTableExists(qualifiedIdent)
     externalCatalog.loadTable(db, table, loadPath, isOverwrite, isSrcLocal)
   }
 
@@ -571,20 +598,20 @@ class SessionCatalog(
       isOverwrite: Boolean,
       inheritTableSpecs: Boolean,
       isSrcLocal: Boolean): Unit = {
-    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(name.table)
+    val qualifiedIdent = qualifyIdentifier(name)
+    val db = qualifiedIdent.database.get
+    val table = qualifiedIdent.table
     requireDbExists(db)
-    requireTableExists(TableIdentifier(table, Some(db)))
+    requireTableExists(qualifiedIdent)
     requireNonEmptyValueInPartitionSpec(Seq(spec))
     externalCatalog.loadPartition(
       db, table, loadPath, spec, isOverwrite, inheritTableSpecs, isSrcLocal)
   }
 
   def defaultTablePath(tableIdent: TableIdentifier): URI = {
-    val dbName = formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase))
-    val dbLocation = getDatabaseMetadata(dbName).locationUri
-
-    new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toUri
+    val qualifiedIdent = qualifyIdentifier(tableIdent)
+    val dbLocation = getDatabaseMetadata(qualifiedIdent.database.get).locationUri
+    new Path(new Path(dbLocation), qualifiedIdent.table).toUri
   }
 
   // ----------------------------------------------
@@ -598,11 +625,11 @@ class SessionCatalog(
       name: String,
       viewDefinition: TemporaryViewRelation,
       overrideIfExists: Boolean): Unit = synchronized {
-    val table = formatTableName(name)
-    if (tempViews.contains(table) && !overrideIfExists) {
+    val normalized = format(name)
+    if (tempViews.contains(normalized) && !overrideIfExists) {
       throw new TempTableAlreadyExistsException(name)
     }
-    tempViews.put(table, viewDefinition)
+    tempViews.put(normalized, viewDefinition)
   }
 
   /**
@@ -612,7 +639,7 @@ class SessionCatalog(
       name: String,
       viewDefinition: TemporaryViewRelation,
       overrideIfExists: Boolean): Unit = {
-    globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists)
+    globalTempViewManager.create(format(name), viewDefinition, overrideIfExists)
   }
 
   /**
@@ -622,7 +649,7 @@ class SessionCatalog(
   def alterTempViewDefinition(
       name: TableIdentifier,
       viewDefinition: TemporaryViewRelation): Boolean = synchronized {
-    val viewName = formatTableName(name.table)
+    val viewName = format(name.table)
     if (name.database.isEmpty) {
       if (tempViews.contains(viewName)) {
         createTempView(viewName, viewDefinition, overrideIfExists = true)
@@ -630,7 +657,7 @@ class SessionCatalog(
       } else {
         false
       }
-    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+    } else if (format(name.database.get) == globalTempViewManager.database) {
       globalTempViewManager.update(viewName, viewDefinition)
     } else {
       false
@@ -641,7 +668,7 @@ class SessionCatalog(
    * Return a local temporary view exactly as it was stored.
    */
   def getRawTempView(name: String): Option[TemporaryViewRelation] = synchronized {
-    tempViews.get(formatTableName(name))
+    tempViews.get(format(name))
   }
 
   /**
@@ -659,7 +686,7 @@ class SessionCatalog(
    * Return a global temporary view exactly as it was stored.
    */
   def getRawGlobalTempView(name: String): Option[TemporaryViewRelation] = {
-    globalTempViewManager.get(formatTableName(name))
+    globalTempViewManager.get(format(name))
   }
 
   /**
@@ -675,7 +702,7 @@ class SessionCatalog(
    * Returns true if this view is dropped successfully, false otherwise.
    */
   def dropTempView(name: String): Boolean = synchronized {
-    tempViews.remove(formatTableName(name)).isDefined
+    tempViews.remove(format(name)).isDefined
   }
 
   /**
@@ -684,7 +711,7 @@ class SessionCatalog(
    * Returns true if this view is dropped successfully, false otherwise.
    */
   def dropGlobalTempView(name: String): Boolean = {
-    globalTempViewManager.remove(formatTableName(name))
+    globalTempViewManager.remove(format(name))
   }
 
   // -------------------------------------------------------------
@@ -701,10 +728,10 @@ class SessionCatalog(
    * current database.
    */
   def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
-    val table = formatTableName(name.table)
+    val table = format(name.table)
     if (name.database.isEmpty) {
       tempViews.get(table).map(_.tableMeta).getOrElse(getTableMetadata(name))
-    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+    } else if (format(name.database.get) == globalTempViewManager.database) {
       globalTempViewManager.get(table).map(_.tableMeta)
         .getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
     } else {
@@ -722,23 +749,23 @@ class SessionCatalog(
    * This assumes the database specified in `newName` matches the one in `oldName`.
    */
   def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = synchronized {
-    val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
-    newName.database.map(formatDatabaseName).foreach { newDb =>
+    val qualifiedIdent = qualifyIdentifier(oldName)
+    val db = qualifiedIdent.database.get
+    newName.database.map(format).foreach { newDb =>
       if (db != newDb) {
         throw QueryCompilationErrors.renameTableSourceAndDestinationMismatchError(db, newDb)
       }
     }
 
-    val oldTableName = formatTableName(oldName.table)
-    val newTableName = formatTableName(newName.table)
+    val oldTableName = qualifiedIdent.table
+    val newTableName = format(newName.table)
     if (db == globalTempViewManager.database) {
       globalTempViewManager.rename(oldTableName, newTableName)
     } else {
       requireDbExists(db)
       if (oldName.database.isDefined || !tempViews.contains(oldTableName)) {
         validateName(newTableName)
-        validateNewLocationOfRename(
-          TableIdentifier(oldTableName, Some(db)), TableIdentifier(newTableName, Some(db)))
+        validateNewLocationOfRename(qualifiedIdent, qualifyIdentifier(newName))
         externalCatalog.renameTable(db, oldTableName, newTableName)
       } else {
         if (newName.database.isDefined) {
@@ -767,8 +794,9 @@ class SessionCatalog(
       name: TableIdentifier,
       ignoreIfNotExists: Boolean,
       purge: Boolean): Unit = synchronized {
-    val db = formatDatabaseName(name.database.getOrElse(currentDb))
-    val table = formatTableName(name.table)
+    val qualifiedIdent = qualifyIdentifier(name)
+    val db = qualifiedIdent.database.get
+    val table = qualifiedIdent.table
     if (db == globalTempViewManager.database) {
       val viewExists = globalTempViewManager.remove(table)
       if (!viewExists && !ignoreIfNotExists) {
@@ -779,7 +807,7 @@ class SessionCatalog(
         requireDbExists(db)
         // When ignoreIfNotExists is false, no exception is issued when the table does not exist.
         // Instead, log it as an error message.
-        if (tableExists(TableIdentifier(table, Option(db)))) {
+        if (tableExists(qualifiedIdent)) {
           externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
         } else if (!ignoreIfNotExists) {
           throw new NoSuchTableException(db = db, table = table)
@@ -806,29 +834,28 @@ class SessionCatalog(
    *
    * @param name The name of the table/view that we look up.
    */
-  def lookupRelation(name: TableIdentifier): LogicalPlan = {
-    synchronized {
-      val db = formatDatabaseName(name.database.getOrElse(currentDb))
-      val table = formatTableName(name.table)
-      if (db == globalTempViewManager.database) {
-        globalTempViewManager.get(table).map { viewDef =>
-          SubqueryAlias(table, db, getTempViewPlan(viewDef))
-        }.getOrElse(throw new NoSuchTableException(db, table))
-      } else if (name.database.isDefined || !tempViews.contains(table)) {
-        val metadata = externalCatalog.getTable(db, table)
-        getRelation(metadata)
-      } else {
-        SubqueryAlias(table, getTempViewPlan(tempViews(table)))
-      }
+  def lookupRelation(name: TableIdentifier): LogicalPlan = synchronized {
+    val qualifiedIdent = qualifyIdentifier(name)
+    val db = qualifiedIdent.database.get
+    val table = qualifiedIdent.table
+    if (db == globalTempViewManager.database) {
+      globalTempViewManager.get(table).map { viewDef =>
+        SubqueryAlias(table, db, getTempViewPlan(viewDef))
+      }.getOrElse(throw new NoSuchTableException(db, table))
+    } else if (name.database.isDefined || !tempViews.contains(table)) {
+      val metadata = externalCatalog.getTable(db, table)
+      getRelation(metadata)
+    } else {
+      SubqueryAlias(table, getTempViewPlan(tempViews(table)))
     }
   }
 
   def getRelation(
       metadata: CatalogTable,
       options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()): LogicalPlan = {
-    val name = metadata.identifier
-    val db = formatDatabaseName(name.database.getOrElse(currentDb))
-    val table = formatTableName(name.table)
+    val qualifiedIdent = qualifyIdentifier(metadata.identifier)
+    val db = qualifiedIdent.database.get
+    val table = qualifiedIdent.table
     val multiParts = Seq(CatalogManager.SESSION_CATALOG_NAME, db, table)
 
     if (metadata.tableType == CatalogTableType.VIEW) {
@@ -939,16 +966,16 @@ class SessionCatalog(
   }
 
   def lookupTempView(table: String): Option[SubqueryAlias] = {
-    val formattedTable = formatTableName(table)
+    val formattedTable = format(table)
     getTempView(formattedTable).map { view =>
       SubqueryAlias(formattedTable, view)
     }
   }
 
   def lookupGlobalTempView(db: String, table: String): Option[SubqueryAlias] = {
-    val formattedDB = formatDatabaseName(db)
+    val formattedDB = format(db)
     if (formattedDB == globalTempViewManager.database) {
-      val formattedTable = formatTableName(table)
+      val formattedTable = format(table)
       getGlobalTempView(formattedTable).map { view =>
         SubqueryAlias(formattedTable, formattedDB, view)
       }
@@ -1036,7 +1063,7 @@ class SessionCatalog(
       db: String,
       pattern: String,
       includeLocalTempViews: Boolean): Seq[TableIdentifier] = {
-    val dbName = formatDatabaseName(db)
+    val dbName = format(db)
     val dbTables = if (dbName == globalTempViewManager.database) {
       globalTempViewManager.listViewNames(pattern).map { name =>
         TableIdentifier(name, Some(globalTempViewManager.database))
@@ -1059,7 +1086,7 @@ class SessionCatalog(
    * List all matching views in the specified database, including local temporary views.
    */
   def listViews(db: String, pattern: String): Seq[TableIdentifier] = {
-    val dbName = formatDatabaseName(db)
+    val dbName = format(db)
     val dbViews = if (dbName == globalTempViewManager.database) {
       globalTempViewManager.listViewNames(pattern).map { name =>
         TableIdentifier(name, Some(globalTempViewManager.database))
@@ -1109,9 +1136,8 @@ class SessionCatalog(
    */
   def refreshTable(name: TableIdentifier): Unit = synchronized {
     lookupTempView(name).map(_.refresh).getOrElse {
-      val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
-      val tableName = formatTableName(name.table)
-      val qualifiedTableName = QualifiedTableName(dbName, tableName)
+      val qualifiedIdent = qualifyIdentifier(name)
+      val qualifiedTableName = QualifiedTableName(qualifiedIdent.database.get, qualifiedIdent.table)
       tableRelationCache.invalidate(qualifiedTableName)
     }
   }
@@ -1144,14 +1170,14 @@ class SessionCatalog(
       tableName: TableIdentifier,
       parts: Seq[CatalogTablePartition],
       ignoreIfExists: Boolean): Unit = {
-    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(tableName.table)
+    val qualifiedIdent = qualifyIdentifier(tableName)
+    val db = qualifiedIdent.database.get
     requireDbExists(db)
-    requireTableExists(TableIdentifier(table, Option(db)))
-    requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+    requireTableExists(qualifiedIdent)
+    requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(qualifiedIdent))
     requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
     externalCatalog.createPartitions(
-      db, table, partitionWithQualifiedPath(tableName, parts), ignoreIfExists)
+      db, qualifiedIdent.table, partitionWithQualifiedPath(qualifiedIdent, parts), ignoreIfExists)
   }
 
   /**
@@ -1164,13 +1190,14 @@ class SessionCatalog(
       ignoreIfNotExists: Boolean,
       purge: Boolean,
       retainData: Boolean): Unit = {
-    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(tableName.table)
+    val qualifiedIdent = qualifyIdentifier(tableName)
+    val db = qualifiedIdent.database.get
     requireDbExists(db)
-    requireTableExists(TableIdentifier(table, Option(db)))
-    requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
+    requireTableExists(qualifiedIdent)
+    requirePartialMatchedPartitionSpec(specs, getTableMetadata(qualifiedIdent))
     requireNonEmptyValueInPartitionSpec(specs)
-    externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
+    externalCatalog.dropPartitions(
+      db, qualifiedIdent.table, specs, ignoreIfNotExists, purge, retainData)
   }
 
   /**
@@ -1183,16 +1210,16 @@ class SessionCatalog(
       tableName: TableIdentifier,
       specs: Seq[TablePartitionSpec],
       newSpecs: Seq[TablePartitionSpec]): Unit = {
-    val tableMetadata = getTableMetadata(tableName)
-    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(tableName.table)
+    val qualifiedIdent = qualifyIdentifier(tableName)
+    val db = qualifiedIdent.database.get
+    val tableMetadata = getTableMetadata(qualifiedIdent)
     requireDbExists(db)
-    requireTableExists(TableIdentifier(table, Option(db)))
+    requireTableExists(qualifiedIdent)
     requireExactMatchedPartitionSpec(specs, tableMetadata)
     requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
     requireNonEmptyValueInPartitionSpec(specs)
     requireNonEmptyValueInPartitionSpec(newSpecs)
-    externalCatalog.renamePartitions(db, table, specs, newSpecs)
+    externalCatalog.renamePartitions(db, qualifiedIdent.table, specs, newSpecs)
   }
 
   /**
@@ -1205,13 +1232,14 @@ class SessionCatalog(
    * this becomes a no-op.
    */
   def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
-    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(tableName.table)
+    val qualifiedIdent = qualifyIdentifier(tableName)
+    val db = qualifiedIdent.database.get
     requireDbExists(db)
-    requireTableExists(TableIdentifier(table, Option(db)))
-    requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+    requireTableExists(qualifiedIdent)
+    requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(qualifiedIdent))
     requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
-    externalCatalog.alterPartitions(db, table, partitionWithQualifiedPath(tableName, parts))
+    externalCatalog.alterPartitions(
+      db, qualifiedIdent.table, partitionWithQualifiedPath(qualifiedIdent, parts))
   }
 
   /**
@@ -1219,13 +1247,13 @@ class SessionCatalog(
    * If no database is specified, assume the table is in the current database.
    */
   def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
-    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(tableName.table)
+    val qualifiedIdent = qualifyIdentifier(tableName)
+    val db = qualifiedIdent.database.get
     requireDbExists(db)
-    requireTableExists(TableIdentifier(table, Option(db)))
-    requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+    requireTableExists(qualifiedIdent)
+    requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(qualifiedIdent))
     requireNonEmptyValueInPartitionSpec(Seq(spec))
-    externalCatalog.getPartition(db, table, spec)
+    externalCatalog.getPartition(db, qualifiedIdent.table, spec)
   }
 
   /**
@@ -1238,15 +1266,15 @@ class SessionCatalog(
   def listPartitionNames(
       tableName: TableIdentifier,
       partialSpec: Option[TablePartitionSpec] = None): Seq[String] = {
-    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(tableName.table)
+    val qualifiedIdent = qualifyIdentifier(tableName)
+    val db = qualifiedIdent.database.get
     requireDbExists(db)
-    requireTableExists(TableIdentifier(table, Option(db)))
+    requireTableExists(qualifiedIdent)
     partialSpec.foreach { spec =>
-      requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+      requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(qualifiedIdent))
       requireNonEmptyValueInPartitionSpec(Seq(spec))
     }
-    externalCatalog.listPartitionNames(db, table, partialSpec)
+    externalCatalog.listPartitionNames(db, qualifiedIdent.table, partialSpec)
   }
 
   /**
@@ -1259,15 +1287,15 @@ class SessionCatalog(
   def listPartitions(
       tableName: TableIdentifier,
       partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = {
-    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(tableName.table)
+    val qualifiedIdent = qualifyIdentifier(tableName)
+    val db = qualifiedIdent.database.get
     requireDbExists(db)
-    requireTableExists(TableIdentifier(table, Option(db)))
+    requireTableExists(qualifiedIdent)
     partialSpec.foreach { spec =>
-      requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+      requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(qualifiedIdent))
       requireNonEmptyValueInPartitionSpec(Seq(spec))
     }
-    externalCatalog.listPartitions(db, table, partialSpec)
+    externalCatalog.listPartitions(db, qualifiedIdent.table, partialSpec)
   }
 
   /**
@@ -1277,11 +1305,12 @@ class SessionCatalog(
   def listPartitionsByFilter(
       tableName: TableIdentifier,
       predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
-    val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
-    val table = formatTableName(tableName.table)
+    val qualifiedIdent = qualifyIdentifier(tableName)
+    val db = qualifiedIdent.database.get
     requireDbExists(db)
-    requireTableExists(TableIdentifier(table, Option(db)))
-    externalCatalog.listPartitionsByFilter(db, table, predicates, conf.sessionLocalTimeZone)
+    requireTableExists(qualifiedIdent)
+    externalCatalog.listPartitionsByFilter(
+      db, qualifiedIdent.table, predicates, conf.sessionLocalTimeZone)
   }
 
   /**
@@ -1368,15 +1397,14 @@ class SessionCatalog(
    *                        in the specified database.
    */
   def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
-    val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase))
+    val qualifiedIdent = qualifyIdentifier(funcDefinition.identifier)
+    val db = qualifiedIdent.database.get
     requireDbExists(db)
-    val identifier = attachSessionCatalog(
-      FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
-    val newFuncDefinition = funcDefinition.copy(identifier = identifier)
-    if (!functionExists(identifier)) {
+    val newFuncDefinition = funcDefinition.copy(identifier = qualifiedIdent)
+    if (!functionExists(qualifiedIdent)) {
       externalCatalog.createFunction(db, newFuncDefinition)
     } else if (!ignoreIfExists) {
-      throw new FunctionAlreadyExistsException(db = db, func = identifier.toString)
+      throw new FunctionAlreadyExistsException(db = db, func = qualifiedIdent.funcName)
     }
   }
 
@@ -1385,20 +1413,21 @@ class SessionCatalog(
    * If no database is specified, assume the function is in the current database.
    */
   def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = {
-    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
+    val qualifiedIdent = qualifyIdentifier(name)
+    val db = qualifiedIdent.database.get
+    val funcName = qualifiedIdent.funcName
     requireDbExists(db)
-    val identifier = attachSessionCatalog(name.copy(database = Some(db)))
-    if (functionExists(identifier)) {
-      if (functionRegistry.functionExists(identifier)) {
+    if (functionExists(qualifiedIdent)) {
+      if (functionRegistry.functionExists(qualifiedIdent)) {
         // If we have loaded this function into the FunctionRegistry,
         // also drop it from there.
         // For a permanent function, because we loaded it to the FunctionRegistry
         // when it's first used, we also need to drop it from the FunctionRegistry.
-        functionRegistry.dropFunction(identifier)
+        functionRegistry.dropFunction(qualifiedIdent)
       }
-      externalCatalog.dropFunction(db, name.funcName)
+      externalCatalog.dropFunction(db, funcName)
     } else if (!ignoreIfNotExists) {
-      throw new NoSuchPermanentFunctionException(db = db, func = identifier.toString)
+      throw new NoSuchPermanentFunctionException(db = db, func = funcName)
     }
   }
 
@@ -1407,22 +1436,21 @@ class SessionCatalog(
    * If no database is specified, assume the function is in the current database.
    */
   def alterFunction(funcDefinition: CatalogFunction): Unit = {
-    val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase))
+    val qualifiedIdent = qualifyIdentifier(funcDefinition.identifier)
+    val db = qualifiedIdent.database.get
     requireDbExists(db)
-    val identifier = attachSessionCatalog(
-      FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
-    val newFuncDefinition = funcDefinition.copy(identifier = identifier)
-    if (functionExists(identifier)) {
-      if (functionRegistry.functionExists(identifier)) {
+    val newFuncDefinition = funcDefinition.copy(identifier = qualifiedIdent)
+    if (functionExists(qualifiedIdent)) {
+      if (functionRegistry.functionExists(qualifiedIdent)) {
         // If we have loaded this function into the FunctionRegistry,
         // also drop it from there.
         // For a permanent function, because we loaded it to the FunctionRegistry
         // when it's first used, we also need to drop it from the FunctionRegistry.
-        functionRegistry.dropFunction(identifier)
+        functionRegistry.dropFunction(qualifiedIdent)
       }
       externalCatalog.alterFunction(db, newFuncDefinition)
     } else {
-      throw new NoSuchPermanentFunctionException(db = db, func = identifier.toString)
+      throw new NoSuchPermanentFunctionException(db = db, func = qualifiedIdent.funcName)
     }
   }
 
@@ -1433,9 +1461,10 @@ class SessionCatalog(
    * If no database is specified, this will return the function in the current database.
    */
   def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = {
-    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
+    val qualifiedIdent = qualifyIdentifier(name)
+    val db = qualifiedIdent.database.get
     requireDbExists(db)
-    externalCatalog.getFunction(db, name.funcName)
+    externalCatalog.getFunction(db, qualifiedIdent.funcName).copy(identifier = qualifiedIdent)
   }
 
   /**
@@ -1443,9 +1472,10 @@ class SessionCatalog(
    */
   def functionExists(name: FunctionIdentifier): Boolean = {
     functionRegistry.functionExists(name) || tableFunctionRegistry.functionExists(name) || {
-      val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
+      val qualifiedIdent = qualifyIdentifier(name)
+      val db = qualifiedIdent.database.get
       requireDbExists(db)
-      externalCatalog.functionExists(db, name.funcName)
+      externalCatalog.functionExists(db, qualifiedIdent.funcName)
     }
   }
 
@@ -1554,8 +1584,10 @@ class SessionCatalog(
    * Returns whether it is a persistent function. If not existed, returns false.
    */
   def isPersistentFunction(name: FunctionIdentifier): Boolean = {
-    val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
-    databaseExists(db) && externalCatalog.functionExists(db, name.funcName)
+    val qualifiedIdent = qualifyIdentifier(name)
+    val db = qualifiedIdent.database.get
+    val funcName = qualifiedIdent.funcName
+    databaseExists(db) && externalCatalog.functionExists(db, funcName)
   }
 
   /**
@@ -1660,16 +1692,16 @@ class SessionCatalog(
    * This supports both scalar and table functions.
    */
   def lookupPersistentFunction(name: FunctionIdentifier): ExpressionInfo = {
-    val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName)
-    val qualifiedName = attachSessionCatalog(name.copy(database = database))
-    functionRegistry.lookupFunction(qualifiedName)
-      .orElse(tableFunctionRegistry.lookupFunction(qualifiedName))
+    val qualifiedIdent = qualifyIdentifier(name)
+    val db = qualifiedIdent.database.get
+    val funcName = qualifiedIdent.funcName
+    functionRegistry.lookupFunction(qualifiedIdent)
+      .orElse(tableFunctionRegistry.lookupFunction(qualifiedIdent))
       .getOrElse {
-        val db = qualifiedName.database.get
         requireDbExists(db)
-        if (externalCatalog.functionExists(db, name.funcName)) {
-          val metadata = externalCatalog.getFunction(db, name.funcName)
-          makeExprInfoForHiveFunction(metadata.copy(identifier = qualifiedName))
+        if (externalCatalog.functionExists(db, funcName)) {
+          val metadata = externalCatalog.getFunction(db, funcName)
+          makeExprInfoForHiveFunction(metadata.copy(identifier = qualifiedIdent))
         } else {
           failFunctionLookup(name)
         }
@@ -1700,19 +1732,20 @@ class SessionCatalog(
       arguments: Seq[Expression],
       registry: FunctionRegistryBase[T],
       createFunctionBuilder: CatalogFunction => FunctionRegistryBase[T]#FunctionBuilder): T = {
-    val database = formatDatabaseName(name.database.getOrElse(currentDb))
-    val qualifiedName = name.copy(database = Some(database))
-    if (registry.functionExists(qualifiedName)) {
+    val qualifiedIdent = qualifyIdentifier(name)
+    val db = qualifiedIdent.database.get
+    val funcName = qualifiedIdent.funcName
+    if (registry.functionExists(qualifiedIdent)) {
       // This function has been already loaded into the function registry.
-      registry.lookupFunction(qualifiedName, arguments)
+      registry.lookupFunction(qualifiedIdent, arguments)
     } else {
       // The function has not been loaded to the function registry, which means
       // that the function is a persistent function (if it actually has been registered
       // in the metastore). We need to first put the function in the function registry.
       val catalogFunction = try {
-        externalCatalog.getFunction(database, qualifiedName.funcName)
+        externalCatalog.getFunction(db, funcName)
       } catch {
-        case _: AnalysisException => failFunctionLookup(qualifiedName)
+        case _: AnalysisException => failFunctionLookup(qualifiedIdent)
       }
       loadFunctionResources(catalogFunction.resources)
       // Please note that qualifiedName is provided by the user. However,
@@ -1720,14 +1753,14 @@ class SessionCatalog(
       // catalog. So, it is possible that qualifiedName is not exactly the same as
       // catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
       // At here, we preserve the input from the user.
-      val funcMetadata = catalogFunction.copy(identifier = qualifiedName)
+      val funcMetadata = catalogFunction.copy(identifier = qualifiedIdent)
       registerFunction(
         funcMetadata,
         overrideIfExists = false,
         registry = registry,
         functionBuilder = createFunctionBuilder(funcMetadata))
       // Now, we need to create the Expression.
-      registry.lookupFunction(qualifiedName, arguments)
+      registry.lookupFunction(qualifiedIdent, arguments)
     }
   }
 
@@ -1794,7 +1827,7 @@ class SessionCatalog(
    * defined).
    */
   def listFunctions(db: String, pattern: String): Seq[(FunctionIdentifier, String)] = {
-    val dbName = formatDatabaseName(db)
+    val dbName = format(db)
     requireDbExists(dbName)
     val dbFunctions = externalCatalog.listFunctions(dbName, pattern).map { f =>
       FunctionIdentifier(f, Some(dbName)) }
@@ -1805,7 +1838,8 @@ class SessionCatalog(
     functions.map {
       case f if FunctionRegistry.functionSet.contains(f) => (f, "SYSTEM")
       case f if TableFunctionRegistry.functionSet.contains(f) => (f, "SYSTEM")
-      case f => (attachSessionCatalog(f), "USER")
+      case f if f.database.isDefined => (qualifyIdentifier(f), "USER")
+      case f => (f, "USER")
     }.distinct
   }
 
@@ -1890,7 +1924,7 @@ class SessionCatalog(
       assert(oldName.database.nonEmpty)
       val databaseLocation =
         externalCatalog.getDatabase(oldName.database.get).locationUri
-      val newTableLocation = new Path(new Path(databaseLocation), formatTableName(newName.table))
+      val newTableLocation = new Path(new Path(databaseLocation), format(newName.table))
       val fs = newTableLocation.getFileSystem(hadoopConf)
       if (fs.exists(newTableLocation)) {
         throw QueryCompilationErrors.cannotOperateManagedTableWithExistingLocationError(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index 2de44d6f349..926514ac62d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -17,9 +17,6 @@
 
 package org.apache.spark.sql.catalyst
 
-import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
-import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
-
 /**
  * An identifier that optionally specifies a database.
  *
@@ -64,28 +61,6 @@ sealed trait CatalystIdentifier {
   override def toString: String = quotedString
 }
 
-object CatalystIdentifier {
-  private def sessionCatalogOption(database: Option[String]): Option[String] = {
-    if (!SQLConf.get.getConf(SQLConf.LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME) &&
-      database.isDefined &&
-      database.get != SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)) {
-      Some(SESSION_CATALOG_NAME)
-    } else {
-      None
-    }
-  }
-
-  def attachSessionCatalog(identifier: TableIdentifier): TableIdentifier = {
-    val catalog = identifier.catalog.orElse(sessionCatalogOption(identifier.database))
-    identifier.copy(catalog = catalog)
-  }
-
-  def attachSessionCatalog(identifier: FunctionIdentifier): FunctionIdentifier = {
-    val catalog = identifier.catalog.orElse(sessionCatalogOption(identifier.database))
-    identifier.copy(catalog = catalog)
-  }
-}
-
 /**
  * Encapsulates an identifier that is either a alias name or an identifier that has table
  * name and a qualifier.
@@ -113,6 +88,7 @@ object AliasIdentifier {
  */
 case class TableIdentifier(table: String, database: Option[String], catalog: Option[String])
   extends CatalystIdentifier {
+  assert(catalog.isEmpty || database.isDefined)
 
   override val identifier: String = table
 
@@ -138,6 +114,7 @@ object TableIdentifier {
  */
 case class FunctionIdentifier(funcName: String, database: Option[String], catalog: Option[String])
   extends CatalystIdentifier {
+  assert(catalog.isEmpty || database.isDefined)
 
   override val identifier: String = funcName
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index a6c2aa8b86a..91809b6176c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.connector.catalog
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.CatalystIdentifier._
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.quoteIfNeeded
@@ -133,8 +132,7 @@ private[sql] object CatalogV2Implicits {
 
     def asTableIdentifier: TableIdentifier = ident.namespace match {
       case ns if ns.isEmpty => TableIdentifier(ident.name)
-      case Array(dbName) =>
-        attachSessionCatalog(TableIdentifier(ident.name, Some(dbName)))
+      case Array(dbName) => TableIdentifier(ident.name, Some(dbName))
       case _ =>
         throw QueryCompilationErrors.identifierHavingMoreThanTwoNamePartsError(
           quoted, "TableIdentifier")
@@ -142,8 +140,7 @@ private[sql] object CatalogV2Implicits {
 
     def asFunctionIdentifier: FunctionIdentifier = ident.namespace() match {
       case ns if ns.isEmpty => FunctionIdentifier(ident.name())
-      case Array(dbName) =>
-        attachSessionCatalog(FunctionIdentifier(ident.name(), Some(dbName)))
+      case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName))
       case _ =>
         throw QueryCompilationErrors.identifierHavingMoreThanTwoNamePartsError(
           quoted, "FunctionIdentifier")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
index 5af1322f909..21f4258bce6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.connector.catalog
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 
 /**
@@ -137,8 +137,7 @@ private[sql] trait LookupCatalog extends Logging {
     def unapply(parts: Seq[String]): Option[TableIdentifier] = {
       def namesToTableIdentifier(names: Seq[String]): Option[TableIdentifier] = names match {
         case Seq(name) => Some(TableIdentifier(name))
-        case Seq(database, name) =>
-          Some(CatalystIdentifier.attachSessionCatalog(TableIdentifier(name, Some(database))))
+        case Seq(database, name) => Some(TableIdentifier(name, Some(database)))
         case _ => None
       }
       parts match {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 1b0a154a3f4..df981ed33e0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -25,12 +25,11 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
-import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -281,13 +280,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
     val catalog = newBasicCatalog()
     val tables = catalog.getTablesByName("db2", Seq("tbl1", "tbl2"))
     assert(tables.map(_.identifier.table).sorted == Seq("tbl1", "tbl2"))
-    assert(tables.forall(_.identifier.catalog.isDefined))
 
     // After renaming a table, the identifier should still be qualified with catalog.
     catalog.renameTable("db2", "tbl1", "tblone")
     val tables2 = catalog.getTablesByName("db2", Seq("tbl2", "tblone"))
     assert(tables2.map(_.identifier.table).sorted == Seq("tbl2", "tblone"))
-    assert(tables2.forall(_.identifier.catalog.isDefined))
   }
 
   test("get tables by name when some tables do not exists") {
@@ -775,7 +772,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
   test("get function") {
     val catalog = newBasicCatalog()
     assert(catalog.getFunction("db2", "func1") ==
-      CatalogFunction(FunctionIdentifier("func1", Some("db2"), Some(SESSION_CATALOG_NAME)),
+      CatalogFunction(FunctionIdentifier("func1", Some("db2")),
         funcClass, Seq.empty[FunctionResource]))
     intercept[NoSuchFunctionException] {
       catalog.getFunction("db2", "does_not_exist")
@@ -1037,7 +1034,7 @@ abstract class CatalogTestUtils {
       database: Option[String] = None,
       defaultColumns: Boolean = false): CatalogTable = {
     CatalogTable(
-      identifier = CatalystIdentifier.attachSessionCatalog(TableIdentifier(name, database)),
+      identifier = TableIdentifier(name, database),
       tableType = CatalogTableType.EXTERNAL,
       storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)),
       schema = if (defaultColumns) {
@@ -1081,7 +1078,7 @@ abstract class CatalogTestUtils {
       name: String,
       props: Map[String, String]): CatalogTable = {
     CatalogTable(
-      identifier = CatalystIdentifier.attachSessionCatalog(TableIdentifier(name, Some(db))),
+      identifier = TableIdentifier(name, Some(db)),
       tableType = CatalogTableType.VIEW,
       storage = CatalogStorageFormat.empty,
       schema = new StructType()
@@ -1095,7 +1092,7 @@ abstract class CatalogTestUtils {
 
   def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
     CatalogFunction(
-      CatalystIdentifier.attachSessionCatalog(FunctionIdentifier(name, database)),
+      FunctionIdentifier(name, database),
       funcClass,
       Seq.empty[FunctionResource])
   }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 9821c6bb2fe..62491e04831 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -501,16 +501,16 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
 
   test("alter table") {
     withBasicCatalog { catalog =>
-      val tbl1 = catalog.externalCatalog.getTable("db2", "tbl1")
+      val tbl1 = catalog.getTableRawMetadata(TableIdentifier("tbl1", Some("db2")))
       catalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem")))
-      val newTbl1 = catalog.externalCatalog.getTable("db2", "tbl1")
+      val newTbl1 = catalog.getTableRawMetadata(TableIdentifier("tbl1", Some("db2")))
       assert(!tbl1.properties.contains("toh"))
       assert(newTbl1.properties.size == tbl1.properties.size + 1)
       assert(newTbl1.properties.get("toh") == Some("frem"))
       // Alter table without explicitly specifying database
       catalog.setCurrentDatabase("db2")
       catalog.alterTable(tbl1.copy(identifier = TableIdentifier("tbl1")))
-      val newestTbl1 = catalog.externalCatalog.getTable("db2", "tbl1")
+      val newestTbl1 = catalog.getTableRawMetadata(TableIdentifier("tbl1", Some("db2")))
       // For hive serde table, hive metastore will set transient_lastDdlTime in table's properties,
       // and its value will be modified, here we ignore it when comparing the two tables.
       assert(newestTbl1.copy(properties = Map.empty) == tbl1.copy(properties = Map.empty))
@@ -570,12 +570,13 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
 
   test("get table") {
     withBasicCatalog { catalog =>
-      assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
-        == catalog.externalCatalog.getTable("db2", "tbl1"))
+      val raw = catalog.externalCatalog.getTable("db2", "tbl1")
+      val withCatalog = raw.copy(
+        identifier = raw.identifier.copy(catalog = Some(SESSION_CATALOG_NAME)))
+      assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2"))) == withCatalog)
       // Get table without explicitly specifying database
       catalog.setCurrentDatabase("db2")
-      assert(catalog.getTableMetadata(TableIdentifier("tbl1"))
-        == catalog.externalCatalog.getTable("db2", "tbl1"))
+      assert(catalog.getTableMetadata(TableIdentifier("tbl1")) == withCatalog)
     }
   }
 
@@ -592,12 +593,16 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
 
   test("get tables by name") {
     withBasicCatalog { catalog =>
+      val rawTables = catalog.externalCatalog.getTablesByName("db2", Seq("tbl1", "tbl2"))
+      val tablesWithCatalog = rawTables.map { t =>
+        t.copy(identifier = t.identifier.copy(catalog = Some(SESSION_CATALOG_NAME)))
+      }
       assert(catalog.getTablesByName(
         Seq(
           TableIdentifier("tbl1", Some("db2")),
           TableIdentifier("tbl2", Some("db2"))
         )
-      ) == catalog.externalCatalog.getTablesByName("db2", Seq("tbl1", "tbl2")))
+      ) == tablesWithCatalog)
       // Get table without explicitly specifying database
       catalog.setCurrentDatabase("db2")
       assert(catalog.getTablesByName(
@@ -605,18 +610,22 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
           TableIdentifier("tbl1"),
           TableIdentifier("tbl2")
         )
-      ) == catalog.externalCatalog.getTablesByName("db2", Seq("tbl1", "tbl2")))
+      ) == tablesWithCatalog)
     }
   }
 
   test("get tables by name when some tables do not exist") {
     withBasicCatalog { catalog =>
+      val rawTables = catalog.externalCatalog.getTablesByName("db2", Seq("tbl1"))
+      val tablesWithCatalog = rawTables.map { t =>
+        t.copy(identifier = t.identifier.copy(catalog = Some(SESSION_CATALOG_NAME)))
+      }
       assert(catalog.getTablesByName(
         Seq(
           TableIdentifier("tbl1", Some("db2")),
           TableIdentifier("tblnotexit", Some("db2"))
         )
-      ) == catalog.externalCatalog.getTablesByName("db2", Seq("tbl1")))
+      ) == tablesWithCatalog)
       // Get table without explicitly specifying database
       catalog.setCurrentDatabase("db2")
       assert(catalog.getTablesByName(
@@ -624,7 +633,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
           TableIdentifier("tbl1"),
           TableIdentifier("tblnotexit")
         )
-      ) == catalog.externalCatalog.getTablesByName("db2", Seq("tbl1")))
+      ) == tablesWithCatalog)
     }
   }
 
@@ -633,12 +642,16 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
     val name = "砖"
     // scalastyle:on
     withBasicCatalog { catalog =>
+      val rawTables = catalog.externalCatalog.getTablesByName("db2", Seq("tbl1"))
+      val tablesWithCatalog = rawTables.map { t =>
+        t.copy(identifier = t.identifier.copy(catalog = Some(SESSION_CATALOG_NAME)))
+      }
       assert(catalog.getTablesByName(
         Seq(
           TableIdentifier("tbl1", Some("db2")),
           TableIdentifier(name, Some("db2"))
         )
-      ) == catalog.externalCatalog.getTablesByName("db2", Seq("tbl1")))
+      ) == tablesWithCatalog)
       // Get table without explicitly specifying database
       catalog.setCurrentDatabase("db2")
       assert(catalog.getTablesByName(
@@ -646,7 +659,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
           TableIdentifier("tbl1"),
           TableIdentifier(name)
         )
-      ) == catalog.externalCatalog.getTablesByName("db2", Seq("tbl1")))
+      ) == tablesWithCatalog)
     }
   }
 
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala
index 2376ea026f7..0db758d5147 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala
@@ -26,7 +26,6 @@ import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.FakeV2SessionCatalog
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -90,8 +89,7 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
       case (sql, table, db) =>
         inside (parseMultipartIdentifier(sql)) {
           case AsTableIdentifier(ident) =>
-            ident shouldEqual TableIdentifier(
-              table, db, if (db.isDefined) Some(SESSION_CATALOG_NAME) else None)
+            ident shouldEqual TableIdentifier(table, db)
         }
     }
     Seq(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 7e11b8b6d36..a3050b18251 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -22,7 +22,7 @@ import java.util.{Locale, Properties}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Stable
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedIdentifier, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Literal
@@ -636,18 +636,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
 
   private def saveAsTable(tableIdent: TableIdentifier): Unit = {
     val catalog = df.sparkSession.sessionState.catalog
-    val tableExists = catalog.tableExists(tableIdent)
-    val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
-    val tableIdentWithDB = CatalystIdentifier.attachSessionCatalog(
-      tableIdent.copy(database = Some(db)))
-    val tableName = tableIdentWithDB.unquotedString
+    val qualifiedIdent = catalog.qualifyIdentifier(tableIdent)
+    val tableExists = catalog.tableExists(qualifiedIdent)
+    val tableName = qualifiedIdent.unquotedString
 
     (tableExists, mode) match {
       case (true, SaveMode.Ignore) =>
         // Do nothing
 
       case (true, SaveMode.ErrorIfExists) =>
-        throw QueryCompilationErrors.tableAlreadyExistsError(tableIdent)
+        throw QueryCompilationErrors.tableAlreadyExistsError(qualifiedIdent)
 
       case (true, SaveMode.Overwrite) =>
         // Get all input data source or hive relations of the query.
@@ -656,7 +654,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
           case relation: HiveTableRelation => relation.tableMeta.identifier
         }
 
-        val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
+        val tableRelation = df.sparkSession.table(qualifiedIdent).queryExecution.analyzed
         EliminateSubqueryAliases(tableRelation) match {
           // check if the table is a data source table (the relation is a BaseRelation).
           case LogicalRelation(dest: BaseRelation, _, _, _) if srcRelations.contains(dest) =>
@@ -669,12 +667,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         }
 
         // Drop the existing table
-        catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
-        createTable(tableIdentWithDB)
+        catalog.dropTable(qualifiedIdent, ignoreIfNotExists = true, purge = false)
+        createTable(qualifiedIdent)
         // Refresh the cache of the table in the catalog.
-        catalog.refreshTable(tableIdentWithDB)
+        catalog.refreshTable(qualifiedIdent)
 
-      case _ => createTable(tableIdentWithDB)
+      case _ => createTable(qualifiedIdent)
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index a495e35bf2c..e168987189d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.CatalystIdentifier._
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -37,10 +36,10 @@ import org.apache.spark.sql.internal.connector.V1Function
 import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
 
 /**
- * Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
- * to the corresponding v1 or v2 commands if the resolved catalog is the session catalog.
- *
- * We can remove this rule once we implement all the catalog functionality in `V2SessionCatalog`.
+ * Converts resolved v2 commands to v1 if the catalog is the session catalog. Since the v2 commands
+ * are resolved, the referred tables/views/functions are resolved as well. This rule uses qualified
+ * identifiers to construct the v1 commands, so that v1 commands do not need to qualify identifiers
+ * again, which may lead to inconsistent behavior if the current database is changed in the middle.
  */
 class ResolveSessionCatalog(val catalogManager: CatalogManager)
   extends Rule[LogicalPlan] with LookupCatalog {
@@ -56,12 +55,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
           throw QueryCompilationErrors.addColumnWithV1TableCannotSpecifyNotNullError
         }
       }
-      AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField))
+      AlterTableAddColumnsCommand(ident, cols.map(convertToStructField))
 
     case ReplaceColumns(ResolvedV1TableIdentifier(_), _) =>
       throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("REPLACE COLUMNS")
 
-    case a @ AlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _, _) =>
+    case a @ AlterColumn(ResolvedTable(catalog, _, table: V1Table, _), _, _, _, _, _, _)
+        if isSessionCatalog(catalog) =>
       if (a.column.name.length > 1) {
         throw QueryCompilationErrors
           .operationOnlySupportedWithV2TableError("ALTER COLUMN with qualified column")
@@ -92,7 +92,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         dataType,
         nullable = true,
         builder.build())
-      AlterTableChangeColumnCommand(ident.asTableIdentifier, colName, newColumn)
+      AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn)
 
     case RenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
       throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("RENAME COLUMN")
@@ -101,16 +101,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
       throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("DROP COLUMN")
 
     case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
-      AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = false)
+      AlterTableSetPropertiesCommand(ident, props, isView = false)
 
     case UnsetTableProperties(ResolvedV1TableIdentifier(ident), keys, ifExists) =>
-      AlterTableUnsetPropertiesCommand(ident.asTableIdentifier, keys, ifExists, isView = false)
+      AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = false)
 
-    case SetViewProperties(ResolvedView(ident, _), props) =>
-      AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = true)
+    case SetViewProperties(ResolvedViewIdentifier(ident), props) =>
+      AlterTableSetPropertiesCommand(ident, props, isView = true)
 
-    case UnsetViewProperties(ResolvedView(ident, _), keys, ifExists) =>
-      AlterTableUnsetPropertiesCommand(ident.asTableIdentifier, keys, ifExists, isView = true)
+    case UnsetViewProperties(ResolvedViewIdentifier(ident), keys, ifExists) =>
+      AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = true)
 
     case DescribeNamespace(DatabaseInSessionCatalog(db), extended, output) if conf.useV1Command =>
       DescribeDatabaseCommand(db, extended, output)
@@ -121,27 +121,26 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
     case SetNamespaceLocation(DatabaseInSessionCatalog(db), location) if conf.useV1Command =>
       AlterDatabaseSetLocationCommand(db, location)
 
-    case RenameTable(ResolvedV1TableOrViewIdentifier(oldName), newName, isView) =>
-      AlterTableRenameCommand(
-        oldName.asTableIdentifier, attachSessionCatalog(newName.asTableIdentifier), isView)
+    case RenameTable(ResolvedV1TableOrViewIdentifier(oldIdent), newName, isView) =>
+      AlterTableRenameCommand(oldIdent, newName.asTableIdentifier, isView)
 
     // Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet.
     case DescribeRelation(
          ResolvedV1TableOrViewIdentifier(ident), partitionSpec, isExtended, output) =>
-      DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended, output)
+      DescribeTableCommand(ident, partitionSpec, isExtended, output)
 
     case DescribeColumn(
          ResolvedViewIdentifier(ident), column: UnresolvedAttribute, isExtended, output) =>
       // For views, the column will not be resolved by `ResolveReferences` because
       // `ResolvedView` stores only the identifier.
-      DescribeColumnCommand(ident.asTableIdentifier, column.nameParts, isExtended, output)
+      DescribeColumnCommand(ident, column.nameParts, isExtended, output)
 
     case DescribeColumn(ResolvedV1TableIdentifier(ident), column, isExtended, output) =>
       column match {
         case u: UnresolvedAttribute =>
           throw QueryCompilationErrors.columnDoesNotExistError(u.name)
         case a: Attribute =>
-          DescribeColumnCommand(ident.asTableIdentifier, a.qualifier :+ a.name, isExtended, output)
+          DescribeColumnCommand(ident, a.qualifier :+ a.name, isExtended, output)
         case Alias(child, _) =>
           throw QueryCompilationErrors.commandNotSupportNestedColumnError(
             "DESC TABLE COLUMN", toPrettySQL(child))
@@ -180,10 +179,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
       }
 
     case RefreshTable(ResolvedV1TableIdentifier(ident)) =>
-      RefreshTableCommand(ident.asTableIdentifier)
+      RefreshTableCommand(ident)
 
-    case RefreshTable(r: ResolvedView) =>
-      RefreshTableCommand(r.identifier.asTableIdentifier)
+    case RefreshTable(ResolvedViewIdentifier(ident)) =>
+      RefreshTableCommand(ident)
 
     // For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
     // session catalog and the table provider is not v2.
@@ -207,17 +206,18 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
       }
 
     case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) =>
-      DropTableCommand(ident.asTableIdentifier, ifExists, isView = false, purge = purge)
+      DropTableCommand(ident, ifExists, isView = false, purge = purge)
 
     // v1 DROP TABLE supports temp view.
     case DropTable(r: ResolvedView, ifExists, purge) =>
       if (!r.isTemp) {
         throw QueryCompilationErrors.cannotDropViewWithDropTableError
       }
-      DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = false, purge = purge)
+      val ResolvedViewIdentifier(ident) = r
+      DropTableCommand(ident, ifExists, isView = false, purge = purge)
 
-    case DropView(r: ResolvedView, ifExists) =>
-      DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = true, purge = false)
+    case DropView(ResolvedViewIdentifier(ident), ifExists) =>
+      DropTableCommand(ident, ifExists, isView = true, purge = false)
 
     case c @ CreateNamespace(DatabaseNameInSessionCatalog(name), _, _) if conf.useV1Command =>
       val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT)
@@ -248,60 +248,60 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
     // ANALYZE TABLE works on permanent views if the views are cached.
     case AnalyzeTable(ResolvedV1TableOrViewIdentifier(ident), partitionSpec, noScan) =>
       if (partitionSpec.isEmpty) {
-        AnalyzeTableCommand(ident.asTableIdentifier, noScan)
+        AnalyzeTableCommand(ident, noScan)
       } else {
-        AnalyzePartitionCommand(ident.asTableIdentifier, partitionSpec, noScan)
+        AnalyzePartitionCommand(ident, partitionSpec, noScan)
       }
 
     case AnalyzeTables(DatabaseInSessionCatalog(db), noScan) =>
       AnalyzeTablesCommand(Some(db), noScan)
 
     case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
-      AnalyzeColumnCommand(ident.asTableIdentifier, columnNames, allColumns)
+      AnalyzeColumnCommand(ident, columnNames, allColumns)
 
     case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) =>
-      RepairTableCommand(ident.asTableIdentifier, addPartitions, dropPartitions)
+      RepairTableCommand(ident, addPartitions, dropPartitions)
 
     case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) =>
       LoadDataCommand(
-        ident.asTableIdentifier,
+        ident,
         path,
         isLocal,
         isOverwrite,
         partition)
 
     case ShowCreateTable(ResolvedV1TableOrViewIdentifier(ident), asSerde, output) if asSerde =>
-      ShowCreateTableAsSerdeCommand(ident.asTableIdentifier, output)
+      ShowCreateTableAsSerdeCommand(ident, output)
 
     // If target is view, force use v1 command
     case ShowCreateTable(ResolvedViewIdentifier(ident), _, output) =>
-      ShowCreateTableCommand(ident.asTableIdentifier, output)
+      ShowCreateTableCommand(ident, output)
 
     case ShowCreateTable(ResolvedV1TableIdentifier(ident), _, output)
-      if conf.useV1Command => ShowCreateTableCommand(ident.asTableIdentifier, output)
+      if conf.useV1Command => ShowCreateTableCommand(ident, output)
 
-    case ShowCreateTable(ResolvedTable(catalog, ident, table: V1Table, _), _, output)
-      if isSessionCatalog(catalog) && DDLUtils.isHiveTable(table.catalogTable) =>
-        ShowCreateTableCommand(ident.asTableIdentifier, output)
+    case ShowCreateTable(ResolvedTable(catalog, _, table: V1Table, _), _, output)
+        if isSessionCatalog(catalog) && DDLUtils.isHiveTable(table.catalogTable) =>
+      ShowCreateTableCommand(table.catalogTable.identifier, output)
 
     case TruncateTable(ResolvedV1TableIdentifier(ident)) =>
-      TruncateTableCommand(ident.asTableIdentifier, None)
+      TruncateTableCommand(ident, None)
 
     case TruncatePartition(ResolvedV1TableIdentifier(ident), partitionSpec) =>
       TruncateTableCommand(
-        ident.asTableIdentifier,
+        ident,
         Seq(partitionSpec).asUnresolvedPartitionSpecs.map(_.spec).headOption)
 
     case ShowPartitions(
         ResolvedV1TableOrViewIdentifier(ident),
         pattern @ (None | Some(UnresolvedPartitionSpec(_, _))), output) =>
       ShowPartitionsCommand(
-        ident.asTableIdentifier,
+        ident,
         output,
         pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec))
 
     case ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns, output) =>
-      val v1TableName = ident.asTableIdentifier
+      val v1TableName = ident
       val resolver = conf.resolver
       val db = ns match {
         case Some(db) if v1TableName.database.exists(!resolver(_, db.head)) =>
@@ -312,14 +312,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
     case RecoverPartitions(ResolvedV1TableIdentifier(ident)) =>
       RepairTableCommand(
-        ident.asTableIdentifier,
+        ident,
         enableAddPartitions = true,
         enableDropPartitions = false,
         "ALTER TABLE RECOVER PARTITIONS")
 
     case AddPartitions(ResolvedV1TableIdentifier(ident), partSpecsAndLocs, ifNotExists) =>
       AlterTableAddPartitionCommand(
-        ident.asTableIdentifier,
+        ident,
         partSpecsAndLocs.asUnresolvedPartitionSpecs.map(spec => (spec.spec, spec.location)),
         ifNotExists)
 
@@ -327,12 +327,12 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         ResolvedV1TableIdentifier(ident),
         UnresolvedPartitionSpec(from, _),
         UnresolvedPartitionSpec(to, _)) =>
-      AlterTableRenamePartitionCommand(ident.asTableIdentifier, from, to)
+      AlterTableRenamePartitionCommand(ident, from, to)
 
     case DropPartitions(
         ResolvedV1TableIdentifier(ident), specs, ifExists, purge) =>
       AlterTableDropPartitionCommand(
-        ident.asTableIdentifier,
+        ident,
         specs.asUnresolvedPartitionSpecs.map(_.spec),
         ifExists,
         purge,
@@ -344,25 +344,22 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         serdeProperties,
         partitionSpec) =>
       AlterTableSerDePropertiesCommand(
-        ident.asTableIdentifier,
+        ident,
         serdeClassName,
         serdeProperties,
         partitionSpec)
 
     case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
-      AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, location)
+      AlterTableSetLocationCommand(ident, partitionSpec, location)
 
-    case AlterViewAs(ResolvedView(ident, _), originalText, query) =>
-      AlterViewAsCommand(
-        ident.asTableIdentifier,
-        originalText,
-        query)
+    case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) =>
+      AlterViewAsCommand(ident, originalText, query)
 
     case CreateView(ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment,
         properties, originalText, child, allowExisting, replace) =>
       if (isSessionCatalog(catalog)) {
         CreateViewCommand(
-          name = attachSessionCatalog(ident.asTableIdentifier),
+          name = catalogManager.v1SessionCatalog.qualifyIdentifier(ident.asTableIdentifier),
           userSpecifiedColumns = userSpecifiedColumns,
           comment = comment,
           properties = properties,
@@ -384,11 +381,11 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
     // If target is view, force use v1 command
     case ShowTableProperties(ResolvedViewIdentifier(ident), propertyKey, output) =>
-      ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output)
+      ShowTablePropertiesCommand(ident, propertyKey, output)
 
     case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey, output)
         if conf.useV1Command =>
-      ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output)
+      ShowTablePropertiesCommand(ident, propertyKey, output)
 
     case DescribeFunction(ResolvedNonPersistentFunc(_, V1Function(info)), extended) =>
       DescribeFunctionCommand(info, extended)
@@ -405,7 +402,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
     case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) =>
       if (isSessionCatalog(catalog)) {
-        val funcIdentifier = identifier.asFunctionIdentifier
+        val funcIdentifier = catalogManager.v1SessionCatalog.qualifyIdentifier(
+          identifier.asFunctionIdentifier)
         DropFunctionCommand(funcIdentifier, ifExists, false)
       } else {
         throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "DROP FUNCTION")
@@ -413,7 +411,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
 
     case RefreshFunction(ResolvedPersistentFunc(catalog, identifier, _)) =>
       if (isSessionCatalog(catalog)) {
-        val funcIdentifier = identifier.asFunctionIdentifier
+        val funcIdentifier = catalogManager.v1SessionCatalog.qualifyIdentifier(
+          identifier.asFunctionIdentifier)
         RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
       } else {
         throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION")
@@ -431,7 +430,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
         }
         val identifier = FunctionIdentifier(ident.name(), database)
         CreateFunctionCommand(
-          identifier,
+          catalogManager.v1SessionCatalog.qualifyIdentifier(identifier),
           className,
           resources,
           false,
@@ -451,9 +450,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
       ignoreIfExists: Boolean,
       storageFormat: CatalogStorageFormat,
       provider: String): CreateTableV1 = {
-    val tableDesc = buildCatalogTable(attachSessionCatalog(ident.asTableIdentifier), tableSchema,
-        partitioning, tableSpec.properties, provider,
-        tableSpec.location, tableSpec.comment, storageFormat, tableSpec.external)
+    val tableDesc = buildCatalogTable(
+      catalogManager.v1SessionCatalog.qualifyIdentifier(ident.asTableIdentifier),
+      tableSchema, partitioning, tableSpec.properties, provider,
+      tableSpec.location, tableSpec.comment, storageFormat, tableSpec.external)
     val mode = if (ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
     CreateTableV1(tableDesc, mode, query)
   }
@@ -558,29 +558,29 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
   }
 
   object ResolvedViewIdentifier {
-    def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match {
-      case ResolvedView(ident, _) => Some(ident)
-      case _ => None
-    }
-  }
-
-  object ResolvedV1TableAndIdentifier {
-    def unapply(resolved: LogicalPlan): Option[(V1Table, Identifier)] = resolved match {
-      case ResolvedTable(catalog, ident, table: V1Table, _) if isSessionCatalog(catalog) =>
-        Some(table -> ident)
+    def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
+      case ResolvedView(ident, isTemp) =>
+        if (isTemp) {
+          Some(ident.asTableIdentifier)
+        } else {
+          // TODO: we should get the v1 identifier from v1 view directly, similar to `V1Table`. But
+          //       there is no general view representation in DS v2 yet.
+          Some(catalogManager.v1SessionCatalog.qualifyIdentifier(ident.asTableIdentifier))
+        }
       case _ => None
     }
   }
 
   object ResolvedV1TableIdentifier {
-    def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match {
-      case ResolvedTable(catalog, ident, _: V1Table, _) if isSessionCatalog(catalog) => Some(ident)
+    def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
+      case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) =>
+        Some(t.catalogTable.identifier)
       case _ => None
     }
   }
 
   object ResolvedV1TableOrViewIdentifier {
-    def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match {
+    def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
       case ResolvedV1TableIdentifier(ident) => Some(ident)
       case ResolvedViewIdentifier(ident) => Some(ident)
       case _ => None
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index a66eac63071..c20b89da95f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, FunctionIdentifier}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo}
@@ -100,8 +100,12 @@ case class DescribeFunctionCommand(
   }
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val identifier = CatalystIdentifier.attachSessionCatalog(
-      FunctionIdentifier(info.getName, Option(info.getDb)))
+    val identifier = if (info.getDb != null) {
+      sparkSession.sessionState.catalog.qualifyIdentifier(
+        FunctionIdentifier(info.getName, Some(info.getDb)))
+    } else {
+      FunctionIdentifier(info.getName)
+    }
     val name = identifier.unquotedString
     val result = if (info.getClassName != null) {
       Row(s"Function: $name") ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index a96857ab102..c0b4f8f25e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -207,8 +207,9 @@ case class AlterTableRenameCommand(
       // back into the hive metastore cache
       catalog.refreshTable(oldName)
       catalog.renameTable(oldName, newName)
+      val newQualifiedIdent = catalog.qualifyIdentifier(oldName.copy(table = newName.table))
       optStorageLevel.foreach { storageLevel =>
-        sparkSession.catalog.cacheTable(newName.unquotedString, storageLevel)
+        sparkSession.catalog.cacheTable(newQualifiedIdent.unquotedString, storageLevel)
       }
     }
     Seq.empty[Row]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 3a2e32cfdc7..4a6d876ea17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -107,8 +107,9 @@ case class CreateViewCommand(
 
     // When creating a permanent view, not allowed to reference temporary objects.
     // This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
-    verifyTemporaryObjectsNotExists(isTemporary, name, analyzedPlan, referredTempFunctions)
-    verifyAutoGeneratedAliasesNotExists(analyzedPlan, isTemporary, name)
+    val qualifiedName = catalog.qualifyIdentifier(name)
+    verifyTemporaryObjectsNotExists(isTemporary, qualifiedName, analyzedPlan, referredTempFunctions)
+    verifyAutoGeneratedAliasesNotExists(analyzedPlan, isTemporary, qualifiedName)
 
     if (viewType == LocalTempView) {
       val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
@@ -136,16 +137,16 @@ case class CreateViewCommand(
         aliasedPlan,
         referredTempFunctions)
       catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace)
-    } else if (catalog.tableExists(name)) {
-      val tableMetadata = catalog.getTableMetadata(name)
+    } else if (catalog.tableExists(qualifiedName)) {
+      val tableMetadata = catalog.getTableMetadata(qualifiedName)
+      val viewIdent = tableMetadata.identifier
       if (allowExisting) {
         // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
         // already exists.
       } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
-        throw QueryCompilationErrors.tableIsNotViewError(name)
+        throw QueryCompilationErrors.tableIsNotViewError(viewIdent)
       } else if (replace) {
         // Detect cyclic view reference on CREATE OR REPLACE VIEW.
-        val viewIdent = tableMetadata.identifier
         checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
 
         // uncache the cached data before replacing an exists view
@@ -159,7 +160,7 @@ case class CreateViewCommand(
       } else {
         // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
         // exists.
-        throw QueryCompilationErrors.viewAlreadyExistsError(name)
+        throw QueryCompilationErrors.viewAlreadyExistsError(viewIdent)
       }
     } else {
       // Create the view if it doesn't exist.
@@ -252,8 +253,9 @@ case class AlterViewAsCommand(
 
   override def run(session: SparkSession): Seq[Row] = {
     val isTemporary = session.sessionState.catalog.isTempView(name)
-    verifyTemporaryObjectsNotExists(isTemporary, name, query, referredTempFunctions)
-    verifyAutoGeneratedAliasesNotExists(query, isTemporary, name)
+    val qualifiedName = session.sessionState.catalog.qualifyIdentifier(name)
+    verifyTemporaryObjectsNotExists(isTemporary, qualifiedName, query, referredTempFunctions)
+    verifyAutoGeneratedAliasesNotExists(query, isTemporary, qualifiedName)
     if (isTemporary) {
       alterTemporaryView(session, query)
     } else {
diff --git a/sql/core/src/test/resources/sql-tests/results/show-create-table.sql.out b/sql/core/src/test/resources/sql-tests/results/show-create-table.sql.out
index 190356aad7f..08c13ccc7e0 100644
--- a/sql/core/src/test/resources/sql-tests/results/show-create-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/show-create-table.sql.out
@@ -294,7 +294,7 @@ SHOW CREATE TABLE view_SPARK_30302 AS SERDE
 -- !query schema
 struct<createtab_stmt:string>
 -- !query output
-CREATE VIEW default.view_SPARK_30302 (
+CREATE VIEW default.view_spark_30302 (
   aaa,
   bbb)
 AS SELECT a, b FROM tbl
@@ -305,7 +305,7 @@ SHOW CREATE TABLE view_SPARK_30302
 -- !query schema
 struct<createtab_stmt:string>
 -- !query output
-CREATE VIEW default.view_SPARK_30302 (
+CREATE VIEW default.view_spark_30302 (
   aaa,
   bbb)
 AS SELECT a, b FROM tbl
@@ -334,7 +334,7 @@ SHOW CREATE TABLE view_SPARK_30302 AS SERDE
 -- !query schema
 struct<createtab_stmt:string>
 -- !query output
-CREATE VIEW default.view_SPARK_30302 (
+CREATE VIEW default.view_spark_30302 (
   aaa COMMENT 'comment with \'quoted text\' for aaa',
   bbb)
 COMMENT 'This is a comment with \'quoted text\' for view'
@@ -346,7 +346,7 @@ SHOW CREATE TABLE view_SPARK_30302
 -- !query schema
 struct<createtab_stmt:string>
 -- !query output
-CREATE VIEW default.view_SPARK_30302 (
+CREATE VIEW default.view_spark_30302 (
   aaa COMMENT 'comment with \'quoted text\' for aaa',
   bbb)
 COMMENT 'This is a comment with \'quoted text\' for view'
@@ -376,7 +376,7 @@ SHOW CREATE TABLE view_SPARK_30302 AS SERDE
 -- !query schema
 struct<createtab_stmt:string>
 -- !query output
-CREATE VIEW default.view_SPARK_30302 (
+CREATE VIEW default.view_spark_30302 (
   aaa,
   bbb)
 TBLPROPERTIES (
@@ -390,7 +390,7 @@ SHOW CREATE TABLE view_SPARK_30302
 -- !query schema
 struct<createtab_stmt:string>
 -- !query output
-CREATE VIEW default.view_SPARK_30302 (
+CREATE VIEW default.view_spark_30302 (
   aaa,
   bbb)
 TBLPROPERTIES (
diff --git a/sql/core/src/test/resources/sql-tests/results/udaf.sql.out b/sql/core/src/test/resources/sql-tests/results/udaf.sql.out
index 6f674e1166c..0605af1c808 100644
--- a/sql/core/src/test/resources/sql-tests/results/udaf.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udaf.sql.out
@@ -31,7 +31,7 @@ SELECT default.myDoubleAvg(int_col1, 3) as my_avg from t1
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Invalid number of arguments for function spark_catalog.default.myDoubleAvg. Expected: 1; Found: 2; line 1 pos 7
+Invalid number of arguments for function spark_catalog.default.mydoubleavg. Expected: 1; Found: 2; line 1 pos 7
 
 
 -- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out
index e7c9cd016c3..80a3d9af942 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out
@@ -31,7 +31,7 @@ SELECT default.myDoubleAvg(udf(int_col1), udf(3)) as my_avg from t1
 struct<>
 -- !query output
 org.apache.spark.sql.AnalysisException
-Invalid number of arguments for function spark_catalog.default.myDoubleAvg. Expected: 1; Found: 2; line 1 pos 7
+Invalid number of arguments for function spark_catalog.default.mydoubleavg. Expected: 1; Found: 2; line 1 pos 7
 
 
 -- !query
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index e261d010444..016e24a1c4b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -252,7 +252,7 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
       e = intercept[AnalysisException] {
         sql(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""")
       }.getMessage
-      assert(e.contains("default.testView is a view. 'LOAD DATA' expects a table"))
+      assert(e.contains("default.testview is a view. 'LOAD DATA' expects a table"))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 44beb3a1697..c4a787cb891 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.permission.{AclEntry, AclStatus}
 import org.apache.spark.{SparkException, SparkFiles}
 import org.apache.spark.internal.config
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, FunctionIdentifier, QualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -66,7 +66,7 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
       .add("col1", "int", nullable = true, metadata = metadata)
       .add("col2", "string")
     CatalogTable(
-      identifier = CatalystIdentifier.attachSessionCatalog(name),
+      identifier = name,
       tableType = CatalogTableType.EXTERNAL,
       storage = storage,
       schema = schema.copy(
@@ -698,7 +698,8 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
     createDatabase(catalog, "dbx")
     val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
     createTable(catalog, tableIdent1)
-    val expectedTable = generateTable(catalog, tableIdent1)
+    val expectedTable = generateTable(
+      catalog, tableIdent1.copy(catalog = Some(SESSION_CATALOG_NAME)))
     checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1))
   }
 
@@ -783,7 +784,7 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
         sql("ALTER TABLE tab1 RENAME TO default.tab2")
       }
       assert(e.getMessage.contains(
-        s"RENAME TEMPORARY VIEW from '`tab1`' to '`$SESSION_CATALOG_NAME`.`default`.`tab2`': " +
+        s"RENAME TEMPORARY VIEW from '`tab1`' to '`default`.`tab2`': " +
           "cannot specify database name 'default' in the destination table"))
 
       val catalog = spark.sessionState.catalog
@@ -808,7 +809,7 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
         sql("ALTER TABLE view1 RENAME TO default.tab2")
       }
       assert(e.getMessage.contains(
-        s"RENAME TEMPORARY VIEW from '`view1`' to '`$SESSION_CATALOG_NAME`.`default`.`tab2`': " +
+        s"RENAME TEMPORARY VIEW from '`view1`' to '`default`.`tab2`': " +
           "cannot specify database name 'default' in the destination table"))
 
       val catalog = spark.sessionState.catalog
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 610fcd54b2a..56362749cf1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -126,30 +126,20 @@ class PlanResolutionSuite extends AnalysisTest {
     t
   }
 
-  private val v1Table: V1Table = {
+  private def createV1TableMock(
+      ident: Identifier,
+      provider: String = v1Format,
+      tableType: CatalogTableType = CatalogTableType.MANAGED): V1Table = {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
     val t = mock(classOf[CatalogTable])
     when(t.schema).thenReturn(new StructType()
       .add("i", "int")
       .add("s", "string")
       .add("point", new StructType().add("x", "int").add("y", "int")))
-    when(t.tableType).thenReturn(CatalogTableType.MANAGED)
-    when(t.provider).thenReturn(Some(v1Format))
-    V1Table(t)
-  }
-
-  private val v1HiveTable: V1Table = {
-    val t = mock(classOf[CatalogTable])
-    when(t.schema).thenReturn(new StructType().add("i", "int").add("s", "string"))
-    when(t.tableType).thenReturn(CatalogTableType.MANAGED)
-    when(t.provider).thenReturn(Some("hive"))
-    V1Table(t)
-  }
-
-  private val view: V1Table = {
-    val t = mock(classOf[CatalogTable])
-    when(t.schema).thenReturn(new StructType().add("i", "int").add("s", "string"))
-    when(t.tableType).thenReturn(CatalogTableType.VIEW)
-    when(t.provider).thenReturn(Some(v1Format))
+    when(t.tableType).thenReturn(tableType)
+    when(t.provider).thenReturn(Some(provider))
+    when(t.identifier).thenReturn(
+      ident.asTableIdentifier.copy(catalog = Some(SESSION_CATALOG_NAME)))
     V1Table(t)
   }
 
@@ -174,14 +164,14 @@ class PlanResolutionSuite extends AnalysisTest {
   private val v2SessionCatalog: TableCatalog = {
     val newCatalog = mock(classOf[TableCatalog])
     when(newCatalog.loadTable(any())).thenAnswer((invocation: InvocationOnMock) => {
-      invocation.getArgument[Identifier](0).name match {
-        case "v1Table" => v1Table
-        case "v1Table1" => v1Table
-        case "v1HiveTable" => v1HiveTable
+      val ident = invocation.getArgument[Identifier](0)
+      ident.name match {
+        case "v1Table" | "v1Table1" => createV1TableMock(ident)
+        case "v1HiveTable" => createV1TableMock(ident, provider = "hive")
         case "v2Table" => table
         case "v2Table1" => table1
         case "v2TableWithAcceptAnySchemaCapability" => tableWithAcceptAnySchemaCapability
-        case "view" => view
+        case "view" => createV1TableMock(ident, tableType = CatalogTableType.VIEW)
         case name => throw new NoSuchTableException(name)
       }
     })
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowFunctionsSuite.scala
index e46e2ce0d14..e49d470f322 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowFunctionsSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.command.v1
 
+import java.util.Locale
+
 import test.org.apache.spark.sql.MyDoubleSum
 
 import org.apache.spark.sql.execution.command
@@ -37,6 +39,10 @@ trait ShowFunctionsSuiteBase extends command.ShowFunctionsSuiteBase
   override protected def dropFunction(name: String): Unit = {
     sql(s"DROP FUNCTION IF EXISTS $name")
   }
+  override protected def qualifiedFunName(ns: String, name: String): String = {
+    // `SessionCatalog` lower-cases function names before creating.
+    super.qualifiedFunName(ns, name).toLowerCase(Locale.ROOT)
+  }
 }
 
 /**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index deb8a458110..94663e5c2ec 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -49,7 +49,7 @@ import org.apache.hadoop.security.UserGroupInformation
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -503,8 +503,7 @@ private[hive] class HiveClientImpl(
     val comment = properties.get("comment")
 
     CatalogTable(
-      identifier = CatalystIdentifier.attachSessionCatalog(
-        TableIdentifier(h.getTableName, Option(h.getDbName))),
+      identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
       tableType = h.getTableType match {
         case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL
         case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 53feae0d8e8..95e5582cb8c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -41,7 +41,7 @@ import org.apache.hadoop.hive.serde.serdeConstants
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, FunctionIdentifier, InternalRow}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
 import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
 import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable, CatalogTablePartition, CatalogUtils, ExternalCatalogUtils, FunctionResource, FunctionResourceType}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -809,8 +809,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
   }
 
   private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
-    val name = CatalystIdentifier.attachSessionCatalog(
-      FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName)))
+    val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
     val resources = hf.getResourceUris.asScala.map { uri =>
       val resourceType = uri.getResourceType() match {
         case ResourceType.ARCHIVE => "archive"
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index f6165e0f9d6..d755dce90e0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -336,7 +336,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         }.getMessage
 
         assert(
-          message.contains(s"Table $SESSION_CATALOG_NAME.default.ctasJsonTable already exists."),
+          message.contains(s"Table $SESSION_CATALOG_NAME.default.ctasjsontable already exists."),
           "We should complain that ctasJsonTable already exists")
 
         // The following statement should be fine if it has IF NOT EXISTS.
@@ -526,7 +526,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
             intercept[AnalysisException] {
               sparkSession.catalog.createTable("createdJsonTable", jsonFilePath.toString)
             }.getMessage.contains(
-              s"Table $SESSION_CATALOG_NAME.default.createdJsonTable already exists."))
+              s"Table $SESSION_CATALOG_NAME.default.createdjsontable already exists."))
         }
 
         // Data should not be deleted.
@@ -909,7 +909,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
         createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet")
       }
       assert(e.getMessage.contains("The format of the existing table " +
-        s"$SESSION_CATALOG_NAME.default.appendOrcToParquet is `Parquet"))
+        s"$SESSION_CATALOG_NAME.default.appendorctoparquet is `Parquet"))
     }
 
     withTable("appendParquetToJson") {
@@ -920,7 +920,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       }.getMessage
 
       assert(msg.contains("The format of the existing table " +
-        s"$SESSION_CATALOG_NAME.default.appendParquetToJson is `Json"))
+        s"$SESSION_CATALOG_NAME.default.appendparquettojson is `Json"))
     }
 
     withTable("appendTextToJson") {
@@ -931,7 +931,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       }.getMessage
       // The format of the existing table can be JsonDataSourceV2 or JsonFileFormat.
       assert(msg.contains("The format of the existing table " +
-        s"$SESSION_CATALOG_NAME.default.appendTextToJson is `Json"))
+        s"$SESSION_CATALOG_NAME.default.appendtexttojson is `Json"))
     }
   }
 
@@ -1251,7 +1251,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       e = intercept[AnalysisException] {
         table(tableName).write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName)
       }.getMessage
-      assert(e.contains(s"Table `$tableName` already exists"))
+      assert(e.contains(s"Table `$SESSION_CATALOG_NAME`.`default`.`$tableName` already exists"))
     }
   }
 
@@ -1346,8 +1346,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
 
       withDebugMode {
         val tableMeta = sharedState.externalCatalog.getTable("default", "t")
-        assert(tableMeta.identifier ==
-          TableIdentifier("t", Some("default"), Some(SESSION_CATALOG_NAME)))
+        assert(tableMeta.identifier == TableIdentifier("t", Some("default")))
         assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json")
       }
     } finally {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 9fd2e6e7f62..87773d502ae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.parser.ParseException
@@ -93,7 +93,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
       .add("col1", "int", nullable = true, metadata = metadata)
       .add("col2", "string")
     CatalogTable(
-      identifier = CatalystIdentifier.attachSessionCatalog(name),
+      identifier = name,
       tableType = CatalogTableType.EXTERNAL,
       storage = storage,
       schema = schema.copy(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowFunctionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowFunctionsSuite.scala
index 95d7af00de3..84d1c209dd4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowFunctionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowFunctionsSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.hive.execution.command
 
-import java.util.Locale
-
 import org.apache.spark.sql.execution.command.v1
 
 /**
@@ -26,8 +24,4 @@ import org.apache.spark.sql.execution.command.v1
  */
 class ShowFunctionsSuite extends v1.ShowFunctionsSuiteBase with CommandSuiteBase {
   override def commandVersion: String = super[ShowFunctionsSuiteBase].commandVersion
-  override def qualifiedFunName(ns: String, name: String): String = {
-    // Hive Metastore lower-cases all identifiers.
-    super.qualifiedFunName(ns, name).toLowerCase(Locale.ROOT)
-  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 2637a1d18e3..ce800e88218 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
+import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -384,7 +385,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
       val msg = intercept[AnalysisException] {
         testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t")
       }.getMessage
-      assert(msg.contains("Table `t` already exists"))
+      assert(msg.contains(s"Table `$SESSION_CATALOG_NAME`.`default`.`t` already exists"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org