You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/08 16:06:56 UTC

[spark] branch master updated: [SPARK-39912][SPARK-39828][SQL] Refine CatalogImpl

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c9175c9da7 [SPARK-39912][SPARK-39828][SQL] Refine CatalogImpl
5c9175c9da7 is described below

commit 5c9175c9da719536123477d7fcc784a4086fbe25
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Aug 9 00:06:19 2022 +0800

    [SPARK-39912][SPARK-39828][SQL] Refine CatalogImpl
    
    ### What changes were proposed in this pull request?
    
    `CatalogImpl` has been updated quite a bit recently, to support v2 catalogs. This PR revisits the recent changes and refines the code a little bit:
    1. fix the naming "3 layer namespace". The spark catalog plugin supports n-part namespace. This PR changes it to `qualified name with catalog`.
    2. always use the v2 code path. Today the v2 code path can already cover all the functionalities of `CatalogImpl` and it's unnecessary to keep the v1 code path in `CatalogImpl`. It also makes sure the behavior is consistent between `db.table` and `spark_catalog.db.table`. Previously it was not consistent in some cases, see the updated tests for functions.
    3. simplify `try {v1 code path} catch {... v2 code path}` to `val name = if (table exists in HMS) {name qualified with spark_catalog} else {parsed name}; v2 code path`
    
    ### Why are the changes needed?
    
    code cleanup.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #37287 from cloud-fan/catalog.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 R/pkg/tests/fulltests/test_sparkSQL.R              |   5 +-
 python/pyspark/sql/catalog.py                      |   6 +-
 python/pyspark/sql/tests/test_catalog.py           |   2 +-
 .../sql/catalyst/catalog/SessionCatalog.scala      |  18 +-
 .../org/apache/spark/sql/catalog/Catalog.scala     | 102 ++---
 .../apache/spark/sql/internal/CatalogImpl.scala    | 474 ++++++++++-----------
 .../spark/sql/execution/GlobalTempViewSuite.scala  |   3 +-
 .../apache/spark/sql/internal/CatalogSuite.scala   |  81 +++-
 8 files changed, 362 insertions(+), 329 deletions(-)

diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index fc54d89a1a4..27994ed76b2 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -4098,14 +4098,13 @@ test_that("catalog APIs, listTables, getTable, listColumns, listFunctions, funct
                c("name", "description", "dataType", "nullable", "isPartition", "isBucket"))
   expect_equal(collect(c)[[1]][[1]], "speed")
   expect_error(listColumns("zxwtyswklpf", "default"),
-               paste("Error in listColumns : analysis error - Table",
-                     "'zxwtyswklpf' does not exist in database 'default'"))
+               paste("Table or view not found: spark_catalog.default.zxwtyswklpf"))
 
   f <- listFunctions()
   expect_true(nrow(f) >= 200) # 250
   expect_equal(colnames(f),
                c("name", "catalog", "namespace", "description", "className", "isTemporary"))
-  expect_equal(take(orderBy(f, "className"), 1)$className,
+  expect_equal(take(orderBy(filter(f, "className IS NOT NULL"), "className"), 1)$className,
                "org.apache.spark.sql.catalyst.expressions.Abs")
   expect_error(listFunctions("zxwtyswklpf_db"),
                paste("Error in listFunctions : no such database - Database",
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 548750d7120..10c9ab5f6d2 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -164,7 +164,7 @@ class Catalog:
         Examples
         --------
         >>> spark.catalog.getDatabase("default")
-        Database(name='default', catalog=None, description='default database', ...
+        Database(name='default', catalog='spark_catalog', description='default database', ...
         >>> spark.catalog.getDatabase("spark_catalog.default")
         Database(name='default', catalog='spark_catalog', description='default database', ...
         """
@@ -376,9 +376,9 @@ class Catalog:
         --------
         >>> func = spark.sql("CREATE FUNCTION my_func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
         >>> spark.catalog.getFunction("my_func1")
-        Function(name='my_func1', catalog=None, namespace=['default'], ...
+        Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ...
         >>> spark.catalog.getFunction("default.my_func1")
-        Function(name='my_func1', catalog=None, namespace=['default'], ...
+        Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ...
         >>> spark.catalog.getFunction("spark_catalog.default.my_func1")
         Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ...
         >>> spark.catalog.getFunction("my_func2")
diff --git a/python/pyspark/sql/tests/test_catalog.py b/python/pyspark/sql/tests/test_catalog.py
index 7d81234bce2..24cd67251a8 100644
--- a/python/pyspark/sql/tests/test_catalog.py
+++ b/python/pyspark/sql/tests/test_catalog.py
@@ -198,7 +198,7 @@ class CatalogTests(ReusedSQLTestCase):
             self.assertTrue("to_unix_timestamp" in functions)
             self.assertTrue("current_database" in functions)
             self.assertEqual(functions["+"].name, "+")
-            self.assertEqual(functions["+"].description, None)
+            self.assertEqual(functions["+"].description, "expr1 + expr2 - Returns `expr1`+`expr2`.")
             self.assertEqual(
                 functions["+"].className, "org.apache.spark.sql.catalyst.expressions.Add"
             )
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 16d89c9b2e4..a0c98aac6c4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -971,13 +971,17 @@ class SessionCatalog(
   }
 
   def lookupTempView(name: TableIdentifier): Option[View] = {
-    val tableName = formatTableName(name.table)
-    if (name.database.isEmpty) {
-      tempViews.get(tableName).map(getTempViewPlan)
-    } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
-      globalTempViewManager.get(tableName).map(getTempViewPlan)
-    } else {
-      None
+    lookupLocalOrGlobalRawTempView(name.database.toSeq :+ name.table).map(getTempViewPlan)
+  }
+
+  /**
+   * Return the raw logical plan of a temporary local or global view for the given name.
+   */
+  def lookupLocalOrGlobalRawTempView(name: Seq[String]): Option[TemporaryViewRelation] = {
+    name match {
+      case Seq(v) => getRawTempView(v)
+      case Seq(db, v) if isGlobalTempViewDB(db) => getRawGlobalTempView(v)
+      case _ => None
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 29b35229e97..82ac8fd6049 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -33,28 +33,28 @@ import org.apache.spark.storage.StorageLevel
 abstract class Catalog {
 
   /**
-   * Returns the current default database in this session.
+   * Returns the current database (namespace) in this session.
    *
    * @since 2.0.0
    */
   def currentDatabase: String
 
   /**
-   * Sets the current default database in this session.
+   * Sets the current database (namespace) in this session.
    *
    * @since 2.0.0
    */
   def setCurrentDatabase(dbName: String): Unit
 
   /**
-   * Returns a list of databases available across all sessions.
+   * Returns a list of databases (namespaces) available within the current catalog.
    *
    * @since 2.0.0
    */
   def listDatabases(): Dataset[Database]
 
   /**
-   * Returns a list of tables/views in the current database.
+   * Returns a list of tables/views in the current database (namespace).
    * This includes all temporary views.
    *
    * @since 2.0.0
@@ -62,7 +62,8 @@ abstract class Catalog {
   def listTables(): Dataset[Table]
 
   /**
-   * Returns a list of tables/views in the specified database.
+   * Returns a list of tables/views in the specified database (namespace) (the name can be qualified
+   * with catalog).
    * This includes all temporary views.
    *
    * @since 2.0.0
@@ -71,16 +72,17 @@ abstract class Catalog {
   def listTables(dbName: String): Dataset[Table]
 
   /**
-   * Returns a list of functions registered in the current database.
-   * This includes all temporary functions
+   * Returns a list of functions registered in the current database (namespace).
+   * This includes all temporary functions.
    *
    * @since 2.0.0
    */
   def listFunctions(): Dataset[Function]
 
   /**
-   * Returns a list of functions registered in the specified database.
-   * This includes all temporary functions
+   * Returns a list of functions registered in the specified database (namespace) (the name can be
+   * qualified with catalog).
+   * This includes all built-in and temporary functions.
    *
    * @since 2.0.0
    */
@@ -90,21 +92,22 @@ abstract class Catalog {
   /**
    * Returns a list of columns for the given table/view or temporary view.
    *
-   * @param tableName is either a qualified or unqualified name that designates a table/view.
-   *                  If no database identifier is provided, it refers to a temporary view or
-   *                  a table/view in the current database.
+   * @param tableName is either a qualified or unqualified name that designates a table/view. It
+   *                  follows the same resolution rule with SQL: search for temp views first then
+   *                  table/views in the current database (namespace).
    * @since 2.0.0
    */
   @throws[AnalysisException]("table does not exist")
   def listColumns(tableName: String): Dataset[Column]
 
   /**
-   * Returns a list of columns for the given table/view in the specified database.
+   * Returns a list of columns for the given table/view in the specified database under the Hive
+   * Metastore.
    *
-   * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace,
-   * use listColumns(tableName) instead.
+   * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+   * qualified table/view name instead.
    *
-   * @param dbName is a name that designates a database.
+   * @param dbName is an unqualified name that designates a database.
    * @param tableName is an unqualified name that designates a table/view.
    * @since 2.0.0
    */
@@ -112,8 +115,8 @@ abstract class Catalog {
   def listColumns(dbName: String, tableName: String): Dataset[Column]
 
   /**
-   * Get the database with the specified name. This throws an AnalysisException when the database
-   * cannot be found.
+   * Get the database (namespace) with the specified name (can be qualified with catalog). This
+   * throws an AnalysisException when the database (namespace) cannot be found.
    *
    * @since 2.1.0
    */
@@ -124,20 +127,20 @@ abstract class Catalog {
    * Get the table or view with the specified name. This table can be a temporary view or a
    * table/view. This throws an AnalysisException when no Table can be found.
    *
-   * @param tableName is either a qualified or unqualified name that designates a table/view.
-   *                  If no database identifier is provided, it refers to a table/view in
-   *                  the current database.
+   * @param tableName is either a qualified or unqualified name that designates a table/view. It
+   *                  follows the same resolution rule with SQL: search for temp views first then
+   *                  table/views in the current database (namespace).
    * @since 2.1.0
    */
   @throws[AnalysisException]("table does not exist")
   def getTable(tableName: String): Table
 
   /**
-   * Get the table or view with the specified name in the specified database. This throws an
-   * AnalysisException when no Table can be found.
+   * Get the table or view with the specified name in the specified database under the Hive
+   * Metastore. This throws an AnalysisException when no Table can be found.
    *
-   * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace,
-   * use getTable(tableName) instead.
+   * To get table/view in other catalogs, please use `getTable(tableName)` with qualified table/view
+   * name instead.
    *
    * @since 2.1.0
    */
@@ -148,22 +151,22 @@ abstract class Catalog {
    * Get the function with the specified name. This function can be a temporary function or a
    * function. This throws an AnalysisException when the function cannot be found.
    *
-   * @param functionName is either a qualified or unqualified name that designates a function.
-   *                     If no database identifier is provided, it refers to a temporary function
-   *                     or a function in the current database.
+   * @param functionName is either a qualified or unqualified name that designates a function. It
+   *                     follows the same resolution rule with SQL: search for built-in/temp
+   *                     functions first then functions in the current database (namespace).
    * @since 2.1.0
    */
   @throws[AnalysisException]("function does not exist")
   def getFunction(functionName: String): Function
 
   /**
-   * Get the function with the specified name. This throws an AnalysisException when the function
-   * cannot be found.
+   * Get the function with the specified name in the specified database under the Hive Metastore.
+   * This throws an AnalysisException when the function cannot be found.
    *
-   * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace,
-   * use getFunction(functionName) instead.
+   * To get functions in other catalogs, please use `getFunction(functionName)` with qualified
+   * function name instead.
    *
-   * @param dbName is a name that designates a database.
+   * @param dbName is an unqualified name that designates a database.
    * @param functionName is an unqualified name that designates a function in the specified database
    * @since 2.1.0
    */
@@ -171,7 +174,8 @@ abstract class Catalog {
   def getFunction(dbName: String, functionName: String): Function
 
   /**
-   * Check if the database with the specified name exists.
+   * Check if the database (namespace) with the specified name exists (the name can be qualified
+   * with catalog).
    *
    * @since 2.1.0
    */
@@ -181,20 +185,21 @@ abstract class Catalog {
    * Check if the table or view with the specified name exists. This can either be a temporary
    * view or a table/view.
    *
-   * @param tableName is either a qualified or unqualified name that designates a table/view.
-   *                  If no database identifier is provided, it refers to a table/view in
-   *                  the current database.
+   * @param tableName is either a qualified or unqualified name that designates a table/view. It
+   *                  follows the same resolution rule with SQL: search for temp views first then
+   *                  table/views in the current database (namespace).
    * @since 2.1.0
    */
   def tableExists(tableName: String): Boolean
 
   /**
-   * Check if the table or view with the specified name exists in the specified database.
+   * Check if the table or view with the specified name exists in the specified database under the
+   * Hive Metastore.
    *
-   * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace,
-   * use tableExists(tableName) instead.
+   * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with
+   * qualified table/view name instead.
    *
-   * @param dbName is a name that designates a database.
+   * @param dbName is an unqualified name that designates a database.
    * @param tableName is an unqualified name that designates a table.
    * @since 2.1.0
    */
@@ -204,20 +209,21 @@ abstract class Catalog {
    * Check if the function with the specified name exists. This can either be a temporary function
    * or a function.
    *
-   * @param functionName is either a qualified or unqualified name that designates a function.
-   *                     If no database identifier is provided, it refers to a function in
-   *                     the current database.
+   * @param functionName is either a qualified or unqualified name that designates a function. It
+   *                     follows the same resolution rule with SQL: search for built-in/temp
+   *                     functions first then functions in the current database (namespace).
    * @since 2.1.0
    */
   def functionExists(functionName: String): Boolean
 
   /**
-   * Check if the function with the specified name exists in the specified database.
+   * Check if the function with the specified name exists in the specified database under the
+   * Hive Metastore.
    *
-   * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace,
-   * use functionExists(functionName) instead.
+   * To check existence of functions in other catalogs, please use `functionExists(functionName)`
+   * with qualified function name instead.
    *
-   * @param dbName is a name that designates a database.
+   * @param dbName is an unqualified name that designates a database.
    * @param functionName is an unqualified name that designates a function.
    * @since 2.1.0
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index e11b349777e..657ed87e609 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -23,14 +23,14 @@ import scala.util.control.NonFatal
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
 import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedNonPersistentFunc, ResolvedPersistentFunc, ResolvedTable, ResolvedView, UnresolvedFunc, UnresolvedIdentifier, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedNonPersistentFunc, ResolvedPersistentFunc, ResolvedTable, ResolvedView, UnresolvedFunc, UnresolvedIdentifier, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, SubqueryAlias, TableSpec, View}
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, SupportsNamespaces, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table}
 import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, IdentifierHelper, MultipartIdentifierHelper, TransformHelper}
 import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.command.ShowTablesCommand
 import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.internal.connector.V1Function
@@ -45,15 +45,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
 
   private def sessionCatalog: SessionCatalog = sparkSession.sessionState.catalog
 
-  private def requireDatabaseExists(dbName: String): Unit = {
-    if (!sessionCatalog.databaseExists(dbName)) {
-      throw QueryCompilationErrors.databaseDoesNotExistError(dbName)
-    }
+  private def parseIdent(name: String): Seq[String] = {
+    sparkSession.sessionState.sqlParser.parseMultipartIdentifier(name)
   }
 
-  private def requireTableExists(dbName: String, tableName: String): Unit = {
-    if (!sessionCatalog.tableExists(TableIdentifier(tableName, Some(dbName)))) {
-      throw QueryCompilationErrors.tableDoesNotExistInDatabaseError(tableName, dbName)
+  private def qualifyV1Ident(nameParts: Seq[String]): Seq[String] = {
+    assert(nameParts.length == 1 || nameParts.length == 2)
+    if (nameParts.length == 1) {
+      Seq(CatalogManager.SESSION_CATALOG_NAME, sessionCatalog.getCurrentDatabase) ++ nameParts
+    } else {
+      CatalogManager.SESSION_CATALOG_NAME +: nameParts
     }
   }
 
@@ -68,32 +69,27 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    */
   @throws[AnalysisException]("database does not exist")
   override def setCurrentDatabase(dbName: String): Unit = {
-    // we assume dbName will not include the catalog prefix. e.g. if you call
-    // setCurrentDatabase("catalog.db") it will search for a database catalog.db in the catalog.
-    val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
-    sparkSession.sessionState.catalogManager.setCurrentNamespace(ident.toArray)
+    // we assume `dbName` will not include the catalog name. e.g. if you call
+    // `setCurrentDatabase("catalog.db")`, it will search for a database 'catalog.db' in the current
+    // catalog.
+    sparkSession.sessionState.catalogManager.setCurrentNamespace(parseIdent(dbName).toArray)
   }
 
   /**
    * Returns a list of databases available across all sessions.
    */
   override def listDatabases(): Dataset[Database] = {
-    val catalog = currentCatalog()
-    val plan = ShowNamespaces(UnresolvedNamespace(Seq(catalog)), None)
-    val databases = sparkSession.sessionState.executePlan(plan).toRdd.collect()
-      .map(row => catalog + "." + row.getString(0))
-      .map(getDatabase)
+    val plan = ShowNamespaces(UnresolvedNamespace(Nil), None)
+    val qe = sparkSession.sessionState.executePlan(plan)
+    val catalog = qe.analyzed.collectFirst {
+      case ShowNamespaces(r: ResolvedNamespace, _, _) => r.catalog
+    }.get
+    val databases = qe.toRdd.collect().map { row =>
+      getNamespace(catalog, parseIdent(row.getString(0)))
+    }
     CatalogImpl.makeDataset(databases, sparkSession)
   }
 
-  private def makeDatabase(dbName: String): Database = {
-    val metadata = sessionCatalog.getDatabaseMetadata(dbName)
-    new Database(
-      name = metadata.name,
-      description = metadata.description,
-      locationUri = CatalogUtils.URIToString(metadata.locationUri))
-  }
-
   /**
    * Returns a list of tables in the current database.
    * This includes all temporary tables.
@@ -110,74 +106,93 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
   override def listTables(dbName: String): Dataset[Table] = {
     // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or
     // a qualified namespace with catalog name. We assume it's a single database name
-    // and check if we can find the dbName in sessionCatalog. If so we listTables under
-    // that database. Otherwise we try 3-part name parsing and locate the database.
-    if (sessionCatalog.databaseExists(dbName) || sessionCatalog.isGlobalTempViewDB(dbName)) {
-      val tables = sessionCatalog.listTables(dbName).map(makeTable)
-      CatalogImpl.makeDataset(tables, sparkSession)
+    // and check if we can find it in the sessionCatalog. If so we list tables under
+    // that database. Otherwise we will resolve the catalog/namespace and list tables there.
+    val namespace = if (sessionCatalog.databaseExists(dbName)) {
+      Seq(CatalogManager.SESSION_CATALOG_NAME, dbName)
     } else {
-      val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
-      val plan = ShowTables(UnresolvedNamespace(ident), None)
-      val ret = sparkSession.sessionState.executePlan(plan).toRdd.collect()
-      val tables = ret
-        .map(row => ident ++ Seq(row.getString(1)))
-        .map(makeTable)
-      CatalogImpl.makeDataset(tables, sparkSession)
+      parseIdent(dbName)
     }
-  }
+    val plan = ShowTables(UnresolvedNamespace(namespace), None)
+    val qe = sparkSession.sessionState.executePlan(plan)
+    val catalog = qe.analyzed.collectFirst {
+      case ShowTables(r: ResolvedNamespace, _, _) => r.catalog
+      case _: ShowTablesCommand =>
+        sparkSession.sessionState.catalogManager.v2SessionCatalog
+    }.get
+    val tables = qe.toRdd.collect().map { row =>
+      val tableName = row.getString(1)
+      val namespaceName = row.getString(0)
+      val isTemp = row.getBoolean(2)
+      if (isTemp) {
+        // Temp views do not belong to any catalog. We shouldn't prepend the catalog name here.
+        val ns = if (namespaceName.isEmpty) Nil else Seq(namespaceName)
+        makeTable(ns :+ tableName)
+      } else {
+        val ns = parseIdent(namespaceName)
+        makeTable(catalog.name() +: ns :+ tableName)
+      }
+    }
+    CatalogImpl.makeDataset(tables, sparkSession)
+  }
+
+  private def makeTable(nameParts: Seq[String]): Table = {
+    sessionCatalog.lookupLocalOrGlobalRawTempView(nameParts).map { tempView =>
+      new Table(
+        name = tempView.tableMeta.identifier.table,
+        catalog = null,
+        namespace = tempView.tableMeta.identifier.database.toArray,
+        description = tempView.tableMeta.comment.orNull,
+        tableType = "TEMPORARY",
+        isTemporary = true)
+    }.getOrElse {
+      val plan = UnresolvedIdentifier(nameParts)
+      sparkSession.sessionState.executePlan(plan).analyzed match {
+        case ResolvedIdentifier(catalog: TableCatalog, ident) =>
+          val tableOpt = try {
+            loadTable(catalog, ident)
+          } catch {
+            // Even if the table exits, error may still happen. For example, Spark can't read Hive's
+            // index table. We return a Table without description and tableType in this case.
+            case NonFatal(_) =>
+              Some(new Table(
+                name = ident.name(),
+                catalog = catalog.name(),
+                namespace = ident.namespace(),
+                description = null,
+                tableType = null,
+                isTemporary = false))
+          }
+          tableOpt.getOrElse(throw QueryCompilationErrors.tableOrViewNotFound(nameParts))
 
-  /**
-   * Returns a Table for the given table/view or temporary view.
-   *
-   * Note that this function requires the table already exists in the Catalog.
-   *
-   * If the table metadata retrieval failed due to any reason (e.g., table serde class
-   * is not accessible or the table type is not accepted by Spark SQL), this function
-   * still returns the corresponding Table without the description and tableType)
-   */
-  private def makeTable(tableIdent: TableIdentifier): Table = {
-    val metadata = try {
-      Some(sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent))
-    } catch {
-      case NonFatal(_) => None
+        case _ => throw QueryCompilationErrors.tableOrViewNotFound(nameParts)
+      }
     }
-    val isTemp = sessionCatalog.isTempView(tableIdent)
-    val qualifier =
-      metadata.map(_.identifier.database).getOrElse(tableIdent.database).map(Array(_)).orNull
-    new Table(
-      name = tableIdent.table,
-      catalog = CatalogManager.SESSION_CATALOG_NAME,
-      namespace = qualifier,
-      description = metadata.map(_.comment.orNull).orNull,
-      tableType = if (isTemp) "TEMPORARY" else metadata.map(_.tableType.name).orNull,
-      isTemporary = isTemp)
-  }
-
-  private def makeTable(ident: Seq[String]): Table = {
-    val plan = UnresolvedTableOrView(ident, "Catalog.listTables", true)
-    val node = sparkSession.sessionState.executePlan(plan).analyzed
-    node match {
-      case t: ResolvedTable =>
-        val isExternal = t.table.properties().getOrDefault(
+  }
+
+  private def loadTable(catalog: TableCatalog, ident: Identifier): Option[Table] = {
+    // TODO: support v2 view when it gets implemented.
+    CatalogV2Util.loadTable(catalog, ident).map {
+      case v1: V1Table if v1.v1Table.tableType == CatalogTableType.VIEW =>
+        new Table(
+          name = v1.v1Table.identifier.table,
+          catalog = catalog.name(),
+          namespace = v1.v1Table.identifier.database.toArray,
+          description = v1.v1Table.comment.orNull,
+          tableType = "VIEW",
+          isTemporary = false)
+      case t: V2Table =>
+        val isExternal = t.properties().getOrDefault(
           TableCatalog.PROP_EXTERNAL, "false").equals("true")
         new Table(
-          name = t.identifier.name(),
-          catalog = t.catalog.name(),
-          namespace = t.identifier.namespace(),
-          description = t.table.properties().get("comment"),
+          name = ident.name(),
+          catalog = catalog.name(),
+          namespace = ident.namespace(),
+          description = t.properties().get("comment"),
           tableType =
             if (isExternal) CatalogTableType.EXTERNAL.name
             else CatalogTableType.MANAGED.name,
           isTemporary = false)
-      case v: ResolvedView =>
-        new Table(
-          name = v.identifier.name(),
-          catalog = null,
-          namespace = v.identifier.namespace(),
-          description = null,
-          tableType = if (v.isTemp) "TEMPORARY" else "VIEW",
-          isTemporary = v.isTemp)
-      case _ => throw QueryCompilationErrors.tableOrViewNotFound(ident)
     }
   }
 
@@ -197,48 +212,37 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
   override def listFunctions(dbName: String): Dataset[Function] = {
     // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or
     // a qualified namespace with catalog name. We assume it's a single database name
-    // and check if we can find the dbName in sessionCatalog. If so we listFunctions under
-    // that database. Otherwise we try 3-part name parsing and locate the database.
-    if (sessionCatalog.databaseExists(dbName)) {
-      val functions = sessionCatalog.listFunctions(dbName)
-        .map { case (functIdent, _) => makeFunction(functIdent) }
-      CatalogImpl.makeDataset(functions, sparkSession)
+    // and check if we can find it in the sessionCatalog. If so we list functions under
+    // that database. Otherwise we will resolve the catalog/namespace and list functions there.
+    val namespace = if (sessionCatalog.databaseExists(dbName)) {
+      Seq(CatalogManager.SESSION_CATALOG_NAME, dbName)
     } else {
-      val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
-      val functions = collection.mutable.ArrayBuilder.make[Function]
-
-      // built-in functions
-      val plan0 = ShowFunctions(UnresolvedNamespace(ident),
-        userScope = false, systemScope = true, None)
-      sparkSession.sessionState.executePlan(plan0).toRdd.collect().foreach { row =>
-        // `lookupBuiltinOrTempFunction` and `lookupBuiltinOrTempTableFunction` in Analyzer
-        // require the input identifier only contains the function name, otherwise, built-in
-        // functions will be skipped.
-        val name = row.getString(0)
-        functions += makeFunction(Seq(name))
-      }
-
-      // user functions
-      val plan1 = ShowFunctions(UnresolvedNamespace(ident),
-        userScope = true, systemScope = false, None)
-      sparkSession.sessionState.executePlan(plan1).toRdd.collect().foreach { row =>
-        // `row.getString(0)` may contain dbName like `db.function`, so extract the function name.
-        val name = row.getString(0).split("\\.").last
-        functions += makeFunction(ident :+ name)
-      }
+      parseIdent(dbName)
+    }
+    val functions = collection.mutable.ArrayBuilder.make[Function]
+
+    // TODO: The SHOW FUNCTIONS should tell us the function type (built-in, temp, persistent) and
+    //       we can simply the code below quite a bit. For now we need to list built-in functions
+    //       separately as several built-in function names are not parsable, such as `!=`.
+
+    // List built-in functions. We don't need to specify the namespace here as SHOW FUNCTIONS with
+    // only system scope does not need to know the catalog and namespace.
+    val plan0 = ShowFunctions(UnresolvedNamespace(Nil), userScope = false, systemScope = true, None)
+    sparkSession.sessionState.executePlan(plan0).toRdd.collect().foreach { row =>
+      // Built-in functions do not belong to any catalog or namespace. We can only look it up with
+      // a single part name.
+      val name = row.getString(0)
+      functions += makeFunction(Seq(name))
+    }
 
-      CatalogImpl.makeDataset(functions.result(), sparkSession)
+    // List user functions.
+    val plan1 = ShowFunctions(UnresolvedNamespace(namespace),
+      userScope = true, systemScope = false, None)
+    sparkSession.sessionState.executePlan(plan1).toRdd.collect().foreach { row =>
+      functions += makeFunction(parseIdent(row.getString(0)))
     }
-  }
 
-  private def makeFunction(funcIdent: FunctionIdentifier): Function = {
-    val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
-    new Function(
-      name = metadata.getName,
-      database = metadata.getDb,
-      description = null, // for now, this is always undefined
-      className = metadata.getClassName,
-      isTemporary = metadata.getDb == null)
+    CatalogImpl.makeDataset(functions.result(), sparkSession)
   }
 
   private def makeFunction(ident: Seq[String]): Function = {
@@ -279,23 +283,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    */
   @throws[AnalysisException]("table does not exist")
   override def listColumns(tableName: String): Dataset[Column] = {
-    // calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name
-    // and optionally contains a database name(thus a TableIdentifier), then we look up the table in
-    // sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of
-    // string as the qualified identifier and resolve the table through SQL analyzer.
-    try {
-      val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-      if (tableExists(ident.database.orNull, ident.table)) {
-        listColumns(ident)
-      } else {
-        val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
-        listColumns(ident)
-      }
-    } catch {
-      case e: org.apache.spark.sql.catalyst.parser.ParseException =>
-        val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
-        listColumns(ident)
+    val parsed = parseIdent(tableName)
+    // For backward compatibility (Spark 3.3 and prior), we should check if the table exists in
+    // the Hive Metastore first.
+    val nameParts = if (parsed.length <= 2 && !sessionCatalog.isTempView(parsed) &&
+      sessionCatalog.tableExists(parsed.asTableIdentifier)) {
+      qualifyV1Ident(parsed)
+    } else {
+      parsed
     }
+    listColumns(nameParts)
   }
 
   /**
@@ -303,25 +300,9 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    */
   @throws[AnalysisException]("database or table does not exist")
   override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
-    requireTableExists(dbName, tableName)
-    listColumns(TableIdentifier(tableName, Some(dbName)))
-  }
-
-  private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
-    val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier)
-
-    val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
-    val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
-    val columns = tableMetadata.schema.map { c =>
-      new Column(
-        name = c.name,
-        description = c.getComment().orNull,
-        dataType = CharVarcharUtils.getRawType(c.metadata).getOrElse(c.dataType).catalogString,
-        nullable = c.nullable,
-        isPartition = partitionColumnNames.contains(c.name),
-        isBucket = bucketColumnNames.contains(c.name))
-    }
-    CatalogImpl.makeDataset(columns, sparkSession)
+    // For backward compatibility (Spark 3.3 and prior), here we always look up the table from the
+    // Hive Metastore.
+    listColumns(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, tableName))
   }
 
   private def listColumns(ident: Seq[String]): Dataset[Column] = {
@@ -361,41 +342,44 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
     CatalogImpl.makeDataset(columns, sparkSession)
   }
 
+  private def getNamespace(catalog: CatalogPlugin, ns: Seq[String]): Database = catalog match {
+    case catalog: SupportsNamespaces =>
+      val metadata = catalog.loadNamespaceMetadata(ns.toArray)
+      new Database(
+        name = ns.quoted,
+        catalog = catalog.name,
+        description = metadata.get(SupportsNamespaces.PROP_COMMENT),
+        locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION))
+    // If the catalog doesn't support namespaces, we assume it's an implicit namespace, which always
+    // exists but has no metadata.
+    case catalog: CatalogPlugin =>
+      new Database(
+        name = ns.quoted,
+        catalog = catalog.name,
+        description = null,
+        locationUri = null)
+    case _ => new Database(name = ns.quoted, description = null, locationUri = null)
+  }
 
   /**
    * Gets the database with the specified name. This throws an `AnalysisException` when no
    * `Database` can be found.
    */
   override def getDatabase(dbName: String): Database = {
-    // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or a
-    // qualified namespace with catalog name. To maintain backwards compatibility, we first assume
-    // it's a single database name and return the database from sessionCatalog if it exists.
-    // Otherwise we try 3-part name parsing and locate the database. If the parased identifier
-    // contains both catalog name and database name, we then search the database in the catalog.
-    if (sessionCatalog.databaseExists(dbName) || sessionCatalog.isGlobalTempViewDB(dbName)) {
-      makeDatabase(dbName)
+    // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or
+    // a qualified namespace with catalog name. We assume it's a single database name
+    // and check if we can find it in the sessionCatalog. Otherwise we will parse `dbName` and
+    // resolve catalog/namespace with it.
+    val namespace = if (sessionCatalog.databaseExists(dbName)) {
+      Seq(CatalogManager.SESSION_CATALOG_NAME, dbName)
     } else {
-      val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
-      val plan = UnresolvedNamespace(ident)
-      val resolved = sparkSession.sessionState.executePlan(plan).analyzed
-      resolved match {
-        case ResolvedNamespace(catalog: SupportsNamespaces, namespace) =>
-          val metadata = catalog.loadNamespaceMetadata(namespace.toArray)
-          new Database(
-            name = namespace.quoted,
-            catalog = catalog.name,
-            description = metadata.get(SupportsNamespaces.PROP_COMMENT),
-            locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION))
-        // similar to databaseExists: if the catalog doesn't support namespaces, we assume it's an
-        // implicit namespace, which exists but has no metadata.
-        case ResolvedNamespace(catalog: CatalogPlugin, namespace) =>
-          new Database(
-            name = namespace.quoted,
-            catalog = catalog.name,
-            description = null,
-            locationUri = null)
-        case _ => new Database(name = dbName, description = null, locationUri = null)
-      }
+      sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+    }
+    val plan = UnresolvedNamespace(namespace)
+    sparkSession.sessionState.executePlan(plan).analyzed match {
+      case ResolvedNamespace(catalog, namespace) =>
+        getNamespace(catalog, namespace)
+      case _ => new Database(name = dbName, description = null, locationUri = null)
     }
   }
 
@@ -404,26 +388,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * table/view. This throws an `AnalysisException` when no `Table` can be found.
    */
   override def getTable(tableName: String): Table = {
-    // calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name
-    // and optionally contains a database name(thus a TableIdentifier), then we look up the table in
-    // sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of
-    // string as the qualified identifier and resolve the table through SQL analyzer.
-    try {
-      val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-      if (tableExists(ident.database.orNull, ident.table)) {
-        makeTable(ident)
-      } else {
-        getTable3LNamespace(tableName)
-      }
-    } catch {
-      case e: org.apache.spark.sql.catalyst.parser.ParseException =>
-        getTable3LNamespace(tableName)
+    val parsed = parseIdent(tableName)
+    // For backward compatibility (Spark 3.3 and prior), we should check if the table exists in
+    // the Hive Metastore first.
+    val nameParts = if (parsed.length <= 2 && !sessionCatalog.isTempView(parsed) &&
+      sessionCatalog.tableExists(parsed.asTableIdentifier)) {
+      qualifyV1Ident(parsed)
+    } else {
+      parsed
     }
-  }
-
-  private def getTable3LNamespace(tableName: String): Table = {
-    val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
-    makeTable(ident)
+    makeTable(nameParts)
   }
 
   /**
@@ -431,10 +405,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * `AnalysisException` when no `Table` can be found.
    */
   override def getTable(dbName: String, tableName: String): Table = {
-    if (tableExists(dbName, tableName)) {
-      makeTable(TableIdentifier(tableName, Option(dbName)))
+    if (sessionCatalog.isGlobalTempViewDB(dbName)) {
+      makeTable(Seq(dbName, tableName))
     } else {
-      throw QueryCompilationErrors.tableOrViewNotFoundInDatabaseError(tableName, dbName)
+      // For backward compatibility (Spark 3.3 and prior), here we always look up the table from the
+      // Hive Metastore.
+      makeTable(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, tableName))
     }
   }
 
@@ -443,19 +419,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * function. This throws an `AnalysisException` when no `Function` can be found.
    */
   override def getFunction(functionName: String): Function = {
-    // calling `sqlParser.parseFunctionIdentifier` to parse functionName. If it contains only
-    // function name and optionally contains a database name(thus a FunctionIdentifier), then
-    // we look up the function in sessionCatalog.
-    // Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of string as
-    // the qualified identifier and resolve the function through SQL analyzer.
-    try {
-      val ident = sparkSession.sessionState.sqlParser.parseFunctionIdentifier(functionName)
-      getFunction(ident.database.orNull, ident.funcName)
-    } catch {
-      case e: org.apache.spark.sql.catalyst.parser.ParseException =>
-        val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(functionName)
-        makeFunction(ident)
+    val parsed = parseIdent(functionName)
+    // For backward compatibility (Spark 3.3 and prior), we should check if the function exists in
+    // the Hive Metastore first.
+    val nameParts = if (parsed.length <= 2 &&
+      !sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) &&
+      sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) {
+      qualifyV1Ident(parsed)
+    } else {
+      parsed
     }
+    makeFunction(nameParts)
   }
 
   /**
@@ -463,7 +437,9 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * found.
    */
   override def getFunction(dbName: String, functionName: String): Function = {
-    makeFunction(FunctionIdentifier(functionName, Option(dbName)))
+    // For backward compatibility (Spark 3.3 and prior), here we always look up the function from
+    // the Hive Metastore.
+    makeFunction(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, functionName))
   }
 
   /**
@@ -471,15 +447,13 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    */
   override def databaseExists(dbName: String): Boolean = {
     // To maintain backwards compatibility, we first treat the input is a simple dbName and check
-    // if sessionCatalog contains it. If no, we try to parse it as 3 part name. If the parased
-    // identifier contains both catalog name and database name, we then search the database in the
-    // catalog.
+    // if sessionCatalog contains it. If no, we try to parse it, resolve catalog and namespace,
+    // and check if namespace exists in the catalog.
     if (!sessionCatalog.databaseExists(dbName)) {
-      val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
-      val plan = sparkSession.sessionState.executePlan(UnresolvedNamespace(ident)).analyzed
-      plan match {
-        case ResolvedNamespace(catalog: SupportsNamespaces, _) =>
-          catalog.namespaceExists(ident.slice(1, ident.size).toArray)
+      val plan = UnresolvedNamespace(parseIdent(dbName))
+      sparkSession.sessionState.executePlan(plan).analyzed match {
+        case ResolvedNamespace(catalog: SupportsNamespaces, ns) =>
+          catalog.namespaceExists(ns.toArray)
         case _ => true
       }
     } else {
@@ -492,11 +466,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * view or a table/view.
    */
   override def tableExists(tableName: String): Boolean = {
-    try {
-      getTable(tableName)
-      true
-    } catch {
-      case e: AnalysisException => false
+    val parsed = parseIdent(tableName)
+    // For backward compatibility (Spark 3.3 and prior), we should check if the table exists in
+    // the Hive Metastore first. This also checks if it's a temp view.
+    (parsed.length <= 2 && {
+      val v1Ident = parsed.asTableIdentifier
+      sessionCatalog.isTempView(v1Ident) || sessionCatalog.tableExists(v1Ident)
+    }) || {
+      val plan = UnresolvedIdentifier(parsed)
+      sparkSession.sessionState.executePlan(plan).analyzed match {
+        case ResolvedIdentifier(catalog: TableCatalog, ident) => catalog.tableExists(ident)
+        case _ => false
+      }
     }
   }
 
@@ -513,22 +494,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * or a function.
    */
   override def functionExists(functionName: String): Boolean = {
-    try {
-      val ident = sparkSession.sessionState.sqlParser.parseFunctionIdentifier(functionName)
-      functionExists(ident.database.orNull, ident.funcName)
-    } catch {
-      case e: org.apache.spark.sql.catalyst.parser.ParseException =>
-        try {
-          val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(functionName)
-          val plan = UnresolvedFunc(ident, "Catalog.functionExists", false, None)
-          sparkSession.sessionState.executePlan(plan).analyzed match {
-            case _: ResolvedPersistentFunc => true
-            case _: ResolvedNonPersistentFunc => true
-            case _ => false
-          }
-        } catch {
-          case _: org.apache.spark.sql.AnalysisException => false
-        }
+    val parsed = parseIdent(functionName)
+    // For backward compatibility (Spark 3.3 and prior), we should check if the function exists in
+    // the Hive Metastore first. This also checks if it's a built-in/temp function.
+    (parsed.length <= 2 && sessionCatalog.functionExists(parsed.asFunctionIdentifier)) || {
+      val plan = UnresolvedIdentifier(parsed)
+      sparkSession.sessionState.executePlan(plan).analyzed match {
+        case ResolvedIdentifier(catalog: FunctionCatalog, ident) => catalog.functionExists(ident)
+        case _ => false
+      }
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
index 8a635807abb..1a9baecfa74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
@@ -21,7 +21,6 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalog.Table
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint}
-import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
@@ -166,7 +165,7 @@ class GlobalTempViewSuite extends QueryTest with SharedSparkSession {
       assert(spark.catalog.tableExists(globalTempDB, "src"))
       assert(spark.catalog.getTable(globalTempDB, "src").toString == new Table(
         name = "src",
-        catalog = CatalogManager.SESSION_CATALOG_NAME,
+        catalog = null,
         namespace = Array(globalTempDB),
         description = null,
         tableType = "TEMPORARY",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 0de48325d98..ab26a4fcc35 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -182,6 +182,31 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     assert(spark.catalog.listTables().collect().map(_.name).toSet == Set("my_table2"))
   }
 
+  test("SPARK-39828: Catalog.listTables() should respect currentCatalog") {
+    assert(spark.catalog.currentCatalog() == "spark_catalog")
+    assert(spark.catalog.listTables().collect().isEmpty)
+    createTable("my_table1")
+    assert(spark.catalog.listTables().collect().map(_.name).toSet == Set("my_table1"))
+
+    val catalogName = "testcat"
+    val dbName = "my_db"
+    val tableName = "my_table2"
+    val tableSchema = new StructType().add("i", "int")
+    val description = "this is a test managed table"
+    sql(s"CREATE NAMESPACE $catalogName.$dbName")
+
+    spark.catalog.setCurrentCatalog("testcat")
+    spark.catalog.setCurrentDatabase("my_db")
+    assert(spark.catalog.listTables().collect().isEmpty)
+
+    createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema,
+      Map.empty[String, String], description)
+    assert(spark.catalog.listTables()
+      .collect()
+      .map(t => Array(t.catalog, t.namespace.mkString("."), t.name).mkString(".")).toSet ==
+      Set("testcat.my_db.my_table2"))
+  }
+
   test("list tables with database") {
     assert(spark.catalog.listTables("default").collect().isEmpty)
     createDatabase("my_db1")
@@ -229,6 +254,33 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     assert(!funcNames2.contains("my_temp_func"))
   }
 
+  test("SPARK-39828: Catalog.listFunctions() should respect currentCatalog") {
+    assert(spark.catalog.currentCatalog() == "spark_catalog")
+    assert(Set("+", "current_database", "window").subsetOf(
+      spark.catalog.listFunctions().collect().map(_.name).toSet))
+    createFunction("my_func")
+    assert(spark.catalog.listFunctions().collect().map(_.name).contains("my_func"))
+
+    sql(s"CREATE NAMESPACE testcat.ns")
+    spark.catalog.setCurrentCatalog("testcat")
+    spark.catalog.setCurrentDatabase("ns")
+
+    val funcCatalog = spark.sessionState.catalogManager.catalog("testcat")
+      .asInstanceOf[InMemoryCatalog]
+    val function: UnboundFunction = new UnboundFunction {
+      override def bind(inputType: StructType): BoundFunction = new ScalarFunction[Int] {
+        override def inputTypes(): Array[DataType] = Array(IntegerType)
+        override def resultType(): DataType = IntegerType
+        override def name(): String = "my_bound_function"
+      }
+      override def description(): String = "my_function"
+      override def name(): String = "my_function"
+    }
+    assert(!spark.catalog.listFunctions().collect().map(_.name).contains("my_func"))
+    funcCatalog.createFunction(Identifier.of(Array("ns"), "my_func"), function)
+    assert(spark.catalog.listFunctions().collect().map(_.name).contains("my_func"))
+  }
+
   test("list functions with database") {
     assert(Set("+", "current_database", "window").subsetOf(
       spark.catalog.listFunctions().collect().map(_.name).toSet))
@@ -283,7 +335,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     testListColumns("tab1", dbName = Some("db1"))
   }
 
-  test("SPARK-39615: three layer namespace compatibility - listColumns") {
+  test("SPARK-39615: qualified name with catalog - listColumns") {
     val answers = Map(
       "col1" -> ("int", true, false, true),
       "col2" -> ("string", true, false, false),
@@ -637,7 +689,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     assert(errMsg.contains("my_temp_table is a temp view. 'recoverPartitions()' expects a table"))
   }
 
-  test("three layer namespace compatibility - create managed table") {
+  test("qualified name with catalog - create managed table") {
     val catalogName = "testcat"
     val dbName = "my_db"
     val tableName = "my_table"
@@ -656,7 +708,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     assert(table.properties().get("comment").equals(description))
   }
 
-  test("three layer namespace compatibility - create external table") {
+  test("qualified name with catalog - create external table") {
     withTempDir { dir =>
       val catalogName = "testcat"
       val dbName = "my_db"
@@ -680,7 +732,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     }
   }
 
-  test("three layer namespace compatibility - list tables") {
+  test("qualified name with catalog - list tables") {
     withTempDir { dir =>
       val catalogName = "testcat"
       val dbName = "my_db"
@@ -729,7 +781,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
       Set("my_table1", "my_table2", "my_temp_table"))
   }
 
-  test("three layer namespace compatibility - get table") {
+  test("qualified name with catalog - get table") {
     val catalogName = "testcat"
     val dbName = "default"
     val tableName = "my_table"
@@ -757,7 +809,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     assert(t2.catalog == CatalogManager.SESSION_CATALOG_NAME)
   }
 
-  test("three layer namespace compatibility - table exists") {
+  test("qualified name with catalog - table exists") {
     val catalogName = "testcat"
     val dbName = "my_db"
     val tableName = "my_table"
@@ -781,7 +833,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     assert(spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString(".")))
   }
 
-  test("three layer namespace compatibility - database exists") {
+  test("qualified name with catalog - database exists") {
     val catalogName = "testcat"
     val dbName = "my_db"
     assert(!spark.catalog.databaseExists(Array(catalogName, dbName).mkString(".")))
@@ -793,7 +845,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     assert(!spark.catalog.databaseExists(Array(catalogName2, dbName).mkString(".")))
   }
 
-  test("SPARK-39506: three layer namespace compatibility - cache table, isCached and" +
+  test("SPARK-39506: qualified name with catalog - cache table, isCached and" +
     "uncacheTable") {
     val tableSchema = new StructType().add("i", "int")
     createTable("my_table", "my_db", "testcat", classOf[FakeV2Provider].getName,
@@ -840,7 +892,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     }
   }
 
-  test("three layer namespace compatibility - get database") {
+  test("qualified name with catalogy - get database") {
     val catalogsAndDatabases =
       Seq(("testcat", "somedb"), ("testcat", "ns.somedb"), ("spark_catalog", "somedb"))
     catalogsAndDatabases.foreach { case (catalog, dbName) =>
@@ -863,7 +915,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     intercept[AnalysisException](spark.catalog.getDatabase("randomcat.db10"))
   }
 
-  test("three layer namespace compatibility - get database, same in hive and testcat") {
+  test("qualified name with catalog - get database, same in hive and testcat") {
     // create 'testdb' in hive and testcat
     val dbName = "testdb"
     sql(s"CREATE NAMESPACE spark_catalog.$dbName COMMENT 'hive database'")
@@ -883,7 +935,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     assert(spark.catalog.getDatabase(qualified).name === db)
   }
 
-  test("three layer namespace compatibility - set current database") {
+  test("qualified name with catalog - set current database") {
     spark.catalog.setCurrentCatalog("testcat")
     // namespace with the same name as catalog
     sql("CREATE NAMESPACE testcat.testcat.my_db")
@@ -912,8 +964,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     assert(e3.contains("unknown_db"))
   }
 
-  test("SPARK-39579: Three layer namespace compatibility - " +
-    "listFunctions, getFunction, functionExists") {
+  test("SPARK-39579: qualified name with catalog - listFunctions, getFunction, functionExists") {
     createDatabase("my_db1")
     createFunction("my_func1", Some("my_db1"))
 
@@ -931,8 +982,8 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
     val func1b = spark.catalog.getFunction("spark_catalog.my_db1.my_func1")
     assert(func1a.name === func1b.name && func1a.namespace === func1b.namespace &&
       func1a.className === func1b.className && func1a.isTemporary === func1b.isTemporary)
-    assert(func1a.catalog === null && func1b.catalog === "spark_catalog")
-    assert(func1a.description === null && func1b.description === "N/A.")
+    assert(func1a.catalog === "spark_catalog" && func1b.catalog === "spark_catalog")
+    assert(func1a.description === "N/A." && func1b.description === "N/A.")
 
     val function: UnboundFunction = new UnboundFunction {
       override def bind(inputType: StructType): BoundFunction = new ScalarFunction[Int] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org