You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/09 03:08:17 UTC

[spark] branch master updated: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cb55efadea1 [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cb55efadea1 is described below

commit cb55efadea1399e1ce6daae5d9ec7896ffce1b93
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Thu Jun 9 11:08:00 2022 +0800

    [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
    
    ### What changes were proposed in this pull request?
    
    1. Change `CreateTable` API to make it support 3 layer namespace.
    2. Change `ListTables` API such that a) it supports `database` parameter and if that `database` does not exist b) further check if the parameter is `catalog.database`.
    
    ### Why are the changes needed?
    
    CreateTable and ListTables does not support 3 layer namespace.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. The API change here is backward compatible and it extends the API to further support 3 layer namespace (e.g. catalog.database.table).
    
    ### How was this patch tested?
    
    UT
    
    Closes #36586 from amaliujia/catalogapi.
    
    Authored-by: Rui Wang <ru...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 R/pkg/tests/fulltests/test_sparkSQL.R              |   5 +-
 .../sql/catalyst/catalog/SessionCatalog.scala      |   4 +
 .../spark/sql/errors/QueryCompilationErrors.scala  |   4 +
 .../org/apache/spark/sql/catalog/interface.scala   |  26 +++-
 .../apache/spark/sql/internal/CatalogImpl.scala    |  92 +++++++++++---
 .../spark/sql/execution/GlobalTempViewSuite.scala  |   4 +-
 .../apache/spark/sql/internal/CatalogSuite.scala   | 140 ++++++++++++++++++++-
 .../spark/sql/hive/MetastoreDataSourcesSuite.scala |   2 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala    |   4 +-
 9 files changed, 251 insertions(+), 30 deletions(-)

diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index df1094bacef..f0abc96613d 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -663,7 +663,7 @@ test_that("test tableNames and tables", {
   expect_equal(count(tables), count + 1)
   expect_equal(count(tables()), count(tables))
   expect_true("tableName" %in% colnames(tables()))
-  expect_true(all(c("tableName", "database", "isTemporary") %in% colnames(tables())))
+  expect_true(all(c("tableName", "namespace", "isTemporary") %in% colnames(tables())))
 
   suppressWarnings(registerTempTable(df, "table2"))
   tables <- listTables()
@@ -4026,7 +4026,8 @@ test_that("catalog APIs, listTables, listColumns, listFunctions", {
   tb <- listTables()
   count <- count(tables())
   expect_equal(nrow(tb), count)
-  expect_equal(colnames(tb), c("name", "database", "description", "tableType", "isTemporary"))
+  expect_equal(colnames(tb),
+               c("name", "catalog", "namespace", "description", "tableType", "isTemporary"))
 
   createOrReplaceTempView(as.DataFrame(cars), "cars")
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index d6c80f98bf7..0152f49c798 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -965,6 +965,10 @@ class SessionCatalog(
     isTempView(nameParts.asTableIdentifier)
   }
 
+  def isGlobalTempViewDB(dbName: String): Boolean = {
+    globalTempViewManager.database.equals(dbName)
+  }
+
   def lookupTempView(name: TableIdentifier): Option[View] = {
     val tableName = formatTableName(name.table)
     if (name.database.isEmpty) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 551eaa6aeb7..68f4320ff67 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2188,6 +2188,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
     new AnalysisException(s"Table or view '$tableName' not found in database '$dbName'")
   }
 
+  def tableOrViewNotFound(ident: Seq[String]): Throwable = {
+    new AnalysisException(s"Table or view '${ident.quoted}' not found")
+  }
+
   def unexpectedTypeOfRelationError(relation: LogicalPlan, tableName: String): Throwable = {
     new AnalysisException(
       s"Unexpected type ${relation.getClass.getCanonicalName} of the relation $tableName")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
index cb270875228..1e4e0b14745 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
@@ -55,7 +55,8 @@ class Database(
  * A table in Spark, as returned by the `listTables` method in [[Catalog]].
  *
  * @param name name of the table.
- * @param database name of the database the table belongs to.
+ * @param catalog name of the catalog that the table belongs to.
+ * @param namespace the namespace that the table belongs to.
  * @param description description of the table.
  * @param tableType type of the table (e.g. view, table).
  * @param isTemporary whether the table is a temporary table.
@@ -64,15 +65,36 @@ class Database(
 @Stable
 class Table(
     val name: String,
-    @Nullable val database: String,
+    @Nullable val catalog: String,
+    @Nullable val namespace: Array[String],
     @Nullable val description: String,
     val tableType: String,
     val isTemporary: Boolean)
   extends DefinedByConstructorParams {
 
+  def this(
+      name: String,
+      database: String,
+      description: String,
+      tableType: String,
+      isTemporary: Boolean) = {
+    this(name, null, Array(database), description, tableType, isTemporary)
+  }
+
+  def database: String = {
+    if (namespace == null) {
+      null
+    } else if (namespace.length == 1) {
+      namespace(0)
+    } else {
+      null
+    }
+  }
+
   override def toString: String = {
     "Table[" +
       s"name='$name', " +
+      Option(catalog).map { d => s"catalog='$d', " }.getOrElse("") +
       Option(database).map { d => s"database='$d', " }.getOrElse("") +
       Option(description).map { d => s"description='$d', " }.getOrElse("") +
       s"tableType='$tableType', " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index f2057285030..4b6ea33f3e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -23,13 +23,14 @@ import scala.util.control.NonFatal
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
 import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedTable
+import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, ResolvedView, UnresolvedDBObjectName, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, RecoverPartitions, SubqueryAlias, View}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowTables, SubqueryAlias, TableSpec, View}
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog}
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
+import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.storage.StorageLevel
 
@@ -97,8 +98,22 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    */
   @throws[AnalysisException]("database does not exist")
   override def listTables(dbName: String): Dataset[Table] = {
-    val tables = sessionCatalog.listTables(dbName).map(makeTable)
-    CatalogImpl.makeDataset(tables, sparkSession)
+    // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or
+    // a qualified namespace with catalog name. We assume it's a single database name
+    // and check if we can find the dbName in sessionCatalog. If so we listTables under
+    // that database. Otherwise we try 3-part name parsing and locate the database.
+    if (sessionCatalog.databaseExists(dbName) || sessionCatalog.isGlobalTempViewDB(dbName)) {
+      val tables = sessionCatalog.listTables(dbName).map(makeTable)
+      CatalogImpl.makeDataset(tables, sparkSession)
+    } else {
+      val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+      val plan = ShowTables(UnresolvedNamespace(ident), None)
+      val ret = sparkSession.sessionState.executePlan(plan).toRdd.collect()
+      val tables = ret
+        .map(row => ident ++ Seq(row.getString(1)))
+        .map(makeTable)
+      CatalogImpl.makeDataset(tables, sparkSession)
+    }
   }
 
   /**
@@ -117,14 +132,45 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
       case NonFatal(_) => None
     }
     val isTemp = sessionCatalog.isTempView(tableIdent)
+    val qualifier =
+      metadata.map(_.identifier.database).getOrElse(tableIdent.database).map(Array(_)).orNull
     new Table(
       name = tableIdent.table,
-      database = metadata.map(_.identifier.database).getOrElse(tableIdent.database).orNull,
+      catalog = CatalogManager.SESSION_CATALOG_NAME,
+      namespace = qualifier,
       description = metadata.map(_.comment.orNull).orNull,
       tableType = if (isTemp) "TEMPORARY" else metadata.map(_.tableType.name).orNull,
       isTemporary = isTemp)
   }
 
+  private def makeTable(ident: Seq[String]): Table = {
+    val plan = UnresolvedTableOrView(ident, "Catalog.listTables", true)
+    val node = sparkSession.sessionState.executePlan(plan).analyzed
+    node match {
+      case t: ResolvedTable =>
+        val isExternal = t.table.properties().getOrDefault(
+          TableCatalog.PROP_EXTERNAL, "false").equals("true")
+        new Table(
+          name = t.identifier.name(),
+          catalog = t.catalog.name(),
+          namespace = t.identifier.namespace(),
+          description = t.table.properties().get("comment"),
+          tableType =
+            if (isExternal) CatalogTableType.EXTERNAL.name
+            else CatalogTableType.MANAGED.name,
+          isTemporary = false)
+      case v: ResolvedView =>
+        new Table(
+          name = v.identifier.name(),
+          catalog = null,
+          namespace = v.identifier.namespace(),
+          description = null,
+          tableType = if (v.isTemp) "TEMPORARY" else "VIEW",
+          isTemporary = v.isTemp)
+      case _ => throw QueryCompilationErrors.tableOrViewNotFound(ident)
+    }
+  }
+
   /**
    * Returns a list of functions registered in the current database.
    * This includes all temporary functions
@@ -367,24 +413,38 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
       schema: StructType,
       description: String,
       options: Map[String, String]): DataFrame = {
-    val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+    val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
     val storage = DataSource.buildStorageFormatFromOptions(options)
     val tableType = if (storage.locationUri.isDefined) {
       CatalogTableType.EXTERNAL
     } else {
       CatalogTableType.MANAGED
     }
-    val tableDesc = CatalogTable(
-      identifier = tableIdent,
-      tableType = tableType,
-      storage = storage,
-      schema = schema,
+    val location = if (storage.locationUri.isDefined) {
+      val locationStr = storage.locationUri.get.toString
+      Some(locationStr)
+    } else {
+      None
+    }
+
+    val tableSpec = TableSpec(
+      properties = Map(),
       provider = Some(source),
-      comment = { if (description.isEmpty) None else Some(description) }
-    )
-    val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
+      options = options,
+      location = location,
+      comment = { if (description.isEmpty) None else Some(description) },
+      serde = None,
+      external = tableType == CatalogTableType.EXTERNAL)
+
+    val plan = CreateTable(
+      name = UnresolvedDBObjectName(ident, isNamespace = false),
+      tableSchema = schema,
+      partitioning = Seq(),
+      tableSpec = tableSpec,
+      ignoreIfExists = false)
+
     sparkSession.sessionState.executePlan(plan).toRdd
-    sparkSession.table(tableIdent)
+    sparkSession.table(tableName)
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
index 28e82aa14e0..8a635807abb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalog.Table
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint}
+import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
@@ -165,7 +166,8 @@ class GlobalTempViewSuite extends QueryTest with SharedSparkSession {
       assert(spark.catalog.tableExists(globalTempDB, "src"))
       assert(spark.catalog.getTable(globalTempDB, "src").toString == new Table(
         name = "src",
-        database = globalTempDB,
+        catalog = CatalogManager.SESSION_CATALOG_NAME,
+        namespace = Array(globalTempDB),
         description = null,
         tableType = "TEMPORARY",
         isTemporary = true).toString)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index edd96e83379..06db60676ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.internal
 
 import java.io.File
 
+import org.scalatest.BeforeAndAfter
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalog.{Column, Database, Function, Table}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, ScalaReflection, TableIdentifier}
@@ -26,6 +28,9 @@ import org.apache.spark.sql.catalyst.analysis.AnalysisTest
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.connector.FakeV2Provider
+import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.storage.StorageLevel
@@ -34,7 +39,7 @@ import org.apache.spark.storage.StorageLevel
 /**
  * Tests for the user-facing [[org.apache.spark.sql.catalog.Catalog]].
  */
-class CatalogSuite extends SharedSparkSession with AnalysisTest {
+class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAfter {
   import testImplicits._
 
   private def sessionCatalog: SessionCatalog = spark.sessionState.catalog
@@ -114,6 +119,15 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest {
     }
   }
 
+  before {
+    spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName)
+  }
+
+  after {
+    spark.sessionState.catalogManager.reset()
+    spark.sessionState.conf.clear()
+  }
+
   test("current database") {
     assert(spark.catalog.currentDatabase == "default")
     assert(sessionCatalog.getCurrentDatabase == "default")
@@ -261,8 +275,8 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest {
   }
 
   test("Table.toString") {
-    assert(new Table("volley", "databasa", "one", "world", isTemporary = true).toString ==
-      "Table[name='volley', database='databasa', description='one', " +
+    assert(new Table("volley", null, Array("databasa"), "one", "world", isTemporary = true).toString
+      == "Table[name='volley', database='databasa', description='one', " +
         "tableType='world', isTemporary='true']")
     assert(new Table("volley", null, null, "world", isTemporary = true).toString ==
       "Table[name='volley', tableType='world', isTemporary='true']")
@@ -290,7 +304,8 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest {
 
   test("catalog classes format in Dataset.show") {
     val db = new Database("nama", "descripta", "locata")
-    val table = new Table("nama", "databasa", "descripta", "typa", isTemporary = false)
+    val table = new Table("nama", "cataloa", Array("databasa"), "descripta", "typa",
+      isTemporary = false)
     val function = new Function("nama", "databasa", "descripta", "classa", isTemporary = false)
     val column = new Column(
       "nama", "descripta", "typa", nullable = false, isPartition = true, isBucket = true)
@@ -299,7 +314,9 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest {
     val functionFields = ScalaReflection.getConstructorParameterValues(function)
     val columnFields = ScalaReflection.getConstructorParameterValues(column)
     assert(dbFields == Seq("nama", "descripta", "locata"))
-    assert(tableFields == Seq("nama", "databasa", "descripta", "typa", false))
+    assert(Seq(tableFields(0), tableFields(1), tableFields(3), tableFields(4), tableFields(5)) ==
+      Seq("nama", "cataloa", "descripta", "typa", false))
+    assert(tableFields(2).asInstanceOf[Array[String]].sameElements(Array("databasa")))
     assert(functionFields == Seq("nama", "databasa", "descripta", "classa", false))
     assert(columnFields == Seq("nama", "descripta", "typa", false, true, true))
     val dbString = CatalogImpl.makeDataset(Seq(db), spark).showString(10)
@@ -307,7 +324,8 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest {
     val functionString = CatalogImpl.makeDataset(Seq(function), spark).showString(10)
     val columnString = CatalogImpl.makeDataset(Seq(column), spark).showString(10)
     dbFields.foreach { f => assert(dbString.contains(f.toString)) }
-    tableFields.foreach { f => assert(tableString.contains(f.toString)) }
+    tableFields.foreach { f => assert(tableString.contains(f.toString) ||
+      tableString.contains(f.asInstanceOf[Array[String]].mkString(""))) }
     functionFields.foreach { f => assert(functionString.contains(f.toString)) }
     columnFields.foreach { f => assert(columnString.contains(f.toString)) }
   }
@@ -553,4 +571,114 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest {
     }.getMessage
     assert(errMsg.contains("my_temp_table is a temp view. 'recoverPartitions()' expects a table"))
   }
+
+  test("three layer namespace compatibility - create managed table") {
+    val catalogName = "testcat"
+    val dbName = "my_db"
+    val tableName = "my_table"
+    val tableSchema = new StructType().add("i", "int")
+    val description = "this is a test table"
+
+    val df = spark.catalog.createTable(
+      tableName = Array(catalogName, dbName, tableName).mkString("."),
+      source = classOf[FakeV2Provider].getName,
+      schema = tableSchema,
+      description = description,
+      options = Map.empty[String, String])
+    assert(df.schema.equals(tableSchema))
+
+    val testCatalog =
+      spark.sessionState.catalogManager.catalog(catalogName).asTableCatalog
+    val table = testCatalog.loadTable(Identifier.of(Array(dbName), tableName))
+    assert(table.schema().equals(tableSchema))
+    assert(table.properties().get("provider").equals(classOf[FakeV2Provider].getName))
+    assert(table.properties().get("comment").equals(description))
+  }
+
+  test("three layer namespace compatibility - create external table") {
+    withTempDir { dir =>
+      val catalogName = "testcat"
+      val dbName = "my_db"
+      val tableName = "my_table"
+      val tableSchema = new StructType().add("i", "int")
+      val description = "this is a test table"
+
+      val df = spark.catalog.createTable(
+        tableName = Array(catalogName, dbName, tableName).mkString("."),
+        source = classOf[FakeV2Provider].getName,
+        schema = tableSchema,
+        description = description,
+        options = Map("path" -> dir.getAbsolutePath))
+      assert(df.schema.equals(tableSchema))
+
+      val testCatalog =
+        spark.sessionState.catalogManager.catalog("testcat").asTableCatalog
+      val table = testCatalog.loadTable(Identifier.of(Array(dbName), tableName))
+      assert(table.schema().equals(tableSchema))
+      assert(table.properties().get("provider").equals(classOf[FakeV2Provider].getName))
+      assert(table.properties().get("comment").equals(description))
+      assert(table.properties().get("path").equals(dir.getAbsolutePath))
+      assert(table.properties().get("external").equals("true"))
+      assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath))
+    }
+  }
+
+  test("three layer namespace compatibility - list tables") {
+    withTempDir { dir =>
+      val catalogName = "testcat"
+      val dbName = "my_db"
+      val tableName = "my_table"
+      val tableSchema = new StructType().add("i", "int")
+      val description = "this is a test managed table"
+
+      spark.catalog.createTable(
+        tableName = Array(catalogName, dbName, tableName).mkString("."),
+        source = classOf[FakeV2Provider].getName,
+        schema = tableSchema,
+        description = description,
+        options = Map.empty[String, String])
+
+      val tableName2 = "my_table2"
+      val description2 = "this is a test external table"
+
+      spark.catalog.createTable(
+        tableName = Array(catalogName, dbName, tableName2).mkString("."),
+        source = classOf[FakeV2Provider].getName,
+        schema = tableSchema,
+        description = description2,
+        options = Map("path" -> dir.getAbsolutePath))
+
+      val tables = spark.catalog.listTables("testcat.my_db").collect()
+      assert(tables.size == 2)
+
+      val expectedTable1 =
+        new Table(tableName, catalogName, Array(dbName), description,
+          CatalogTableType.MANAGED.name, false)
+      assert(tables.exists(t =>
+        expectedTable1.name.equals(t.name) && expectedTable1.database.equals(t.database) &&
+        expectedTable1.description.equals(t.description) &&
+        expectedTable1.tableType.equals(t.tableType) &&
+        expectedTable1.isTemporary == t.isTemporary))
+
+      val expectedTable2 =
+        new Table(tableName2, catalogName, Array(dbName), description2,
+          CatalogTableType.EXTERNAL.name, false)
+      assert(tables.exists(t =>
+        expectedTable2.name.equals(t.name) && expectedTable2.database.equals(t.database) &&
+        expectedTable2.description.equals(t.description) &&
+        expectedTable2.tableType.equals(t.tableType) &&
+        expectedTable2.isTemporary == t.isTemporary))
+    }
+  }
+
+  test("list tables when there is `default` catalog") {
+    spark.conf.set("spark.sql.catalog.default", classOf[InMemoryCatalog].getName)
+
+    assert(spark.catalog.listTables("default").collect().isEmpty)
+    createTable("my_table1")
+    createTable("my_table2")
+    createTempTable("my_temp_table")
+    assert(spark.catalog.listTables("default").collect().map(_.name).toSet ==
+      Set("my_table1", "my_table2", "my_temp_table"))
+  }
 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 16b5d6cf1bf..1de680260af 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -524,7 +524,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
           assert(
             intercept[AnalysisException] {
               sparkSession.catalog.createTable("createdJsonTable", jsonFilePath.toString)
-            }.getMessage.contains("Table createdJsonTable already exists."),
+            }.getMessage.contains("Table default.createdJsonTable already exists."),
             "We should complain that createdJsonTable already exists")
         }
 
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index c4cef44b6cc..5ea9a1adaa5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1650,8 +1650,8 @@ class HiveDDLSuite
         // Even if index tables exist, listTables and getTable APIs should still work
         checkAnswer(
           spark.catalog.listTables().toDF(),
-          Row(indexTabName, "default", null, null, false) ::
-            Row(tabName, "default", null, "MANAGED", false) :: Nil)
+          Row(indexTabName, "spark_catalog", Array("default"), null, null, false) ::
+            Row(tabName, "spark_catalog", Array("default"), null, "MANAGED", false) :: Nil)
         assert(spark.catalog.getTable("default", indexTabName).name === indexTabName)
 
         intercept[TableAlreadyExistsException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org