You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/09 03:08:17 UTC
[spark] branch master updated: [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cb55efadea1 [SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
cb55efadea1 is described below
commit cb55efadea1399e1ce6daae5d9ec7896ffce1b93
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Thu Jun 9 11:08:00 2022 +0800
[SPARK-39236][SQL] Make CreateTable and ListTables be compatible with 3 layer namespace
### What changes were proposed in this pull request?
1. Change `CreateTable` API to make it support 3 layer namespace.
2. Change `ListTables` API such that a) it supports `database` parameter and if that `database` does not exist b) further check if the parameter is `catalog.database`.
### Why are the changes needed?
CreateTable and ListTables does not support 3 layer namespace.
### Does this PR introduce _any_ user-facing change?
Yes. The API change here is backward compatible and it extends the API to further support 3 layer namespace (e.g. catalog.database.table).
### How was this patch tested?
UT
Closes #36586 from amaliujia/catalogapi.
Authored-by: Rui Wang <ru...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
R/pkg/tests/fulltests/test_sparkSQL.R | 5 +-
.../sql/catalyst/catalog/SessionCatalog.scala | 4 +
.../spark/sql/errors/QueryCompilationErrors.scala | 4 +
.../org/apache/spark/sql/catalog/interface.scala | 26 +++-
.../apache/spark/sql/internal/CatalogImpl.scala | 92 +++++++++++---
.../spark/sql/execution/GlobalTempViewSuite.scala | 4 +-
.../apache/spark/sql/internal/CatalogSuite.scala | 140 ++++++++++++++++++++-
.../spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +-
.../spark/sql/hive/execution/HiveDDLSuite.scala | 4 +-
9 files changed, 251 insertions(+), 30 deletions(-)
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index df1094bacef..f0abc96613d 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -663,7 +663,7 @@ test_that("test tableNames and tables", {
expect_equal(count(tables), count + 1)
expect_equal(count(tables()), count(tables))
expect_true("tableName" %in% colnames(tables()))
- expect_true(all(c("tableName", "database", "isTemporary") %in% colnames(tables())))
+ expect_true(all(c("tableName", "namespace", "isTemporary") %in% colnames(tables())))
suppressWarnings(registerTempTable(df, "table2"))
tables <- listTables()
@@ -4026,7 +4026,8 @@ test_that("catalog APIs, listTables, listColumns, listFunctions", {
tb <- listTables()
count <- count(tables())
expect_equal(nrow(tb), count)
- expect_equal(colnames(tb), c("name", "database", "description", "tableType", "isTemporary"))
+ expect_equal(colnames(tb),
+ c("name", "catalog", "namespace", "description", "tableType", "isTemporary"))
createOrReplaceTempView(as.DataFrame(cars), "cars")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index d6c80f98bf7..0152f49c798 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -965,6 +965,10 @@ class SessionCatalog(
isTempView(nameParts.asTableIdentifier)
}
+ def isGlobalTempViewDB(dbName: String): Boolean = {
+ globalTempViewManager.database.equals(dbName)
+ }
+
def lookupTempView(name: TableIdentifier): Option[View] = {
val tableName = formatTableName(name.table)
if (name.database.isEmpty) {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 551eaa6aeb7..68f4320ff67 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -2188,6 +2188,10 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
new AnalysisException(s"Table or view '$tableName' not found in database '$dbName'")
}
+ def tableOrViewNotFound(ident: Seq[String]): Throwable = {
+ new AnalysisException(s"Table or view '${ident.quoted}' not found")
+ }
+
def unexpectedTypeOfRelationError(relation: LogicalPlan, tableName: String): Throwable = {
new AnalysisException(
s"Unexpected type ${relation.getClass.getCanonicalName} of the relation $tableName")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
index cb270875228..1e4e0b14745 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/interface.scala
@@ -55,7 +55,8 @@ class Database(
* A table in Spark, as returned by the `listTables` method in [[Catalog]].
*
* @param name name of the table.
- * @param database name of the database the table belongs to.
+ * @param catalog name of the catalog that the table belongs to.
+ * @param namespace the namespace that the table belongs to.
* @param description description of the table.
* @param tableType type of the table (e.g. view, table).
* @param isTemporary whether the table is a temporary table.
@@ -64,15 +65,36 @@ class Database(
@Stable
class Table(
val name: String,
- @Nullable val database: String,
+ @Nullable val catalog: String,
+ @Nullable val namespace: Array[String],
@Nullable val description: String,
val tableType: String,
val isTemporary: Boolean)
extends DefinedByConstructorParams {
+ def this(
+ name: String,
+ database: String,
+ description: String,
+ tableType: String,
+ isTemporary: Boolean) = {
+ this(name, null, Array(database), description, tableType, isTemporary)
+ }
+
+ def database: String = {
+ if (namespace == null) {
+ null
+ } else if (namespace.length == 1) {
+ namespace(0)
+ } else {
+ null
+ }
+ }
+
override def toString: String = {
"Table[" +
s"name='$name', " +
+ Option(catalog).map { d => s"catalog='$d', " }.getOrElse("") +
Option(database).map { d => s"database='$d', " }.getOrElse("") +
Option(description).map { d => s"description='$d', " }.getOrElse("") +
s"tableType='$tableType', " +
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index f2057285030..4b6ea33f3e6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -23,13 +23,14 @@ import scala.util.control.NonFatal
import org.apache.spark.sql._
import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedTable
+import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, ResolvedView, UnresolvedDBObjectName, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, RecoverPartitions, SubqueryAlias, View}
+import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowTables, SubqueryAlias, TableSpec, View}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
+import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog}
import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
+import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel
@@ -97,8 +98,22 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
@throws[AnalysisException]("database does not exist")
override def listTables(dbName: String): Dataset[Table] = {
- val tables = sessionCatalog.listTables(dbName).map(makeTable)
- CatalogImpl.makeDataset(tables, sparkSession)
+ // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or
+ // a qualified namespace with catalog name. We assume it's a single database name
+ // and check if we can find the dbName in sessionCatalog. If so we listTables under
+ // that database. Otherwise we try 3-part name parsing and locate the database.
+ if (sessionCatalog.databaseExists(dbName) || sessionCatalog.isGlobalTempViewDB(dbName)) {
+ val tables = sessionCatalog.listTables(dbName).map(makeTable)
+ CatalogImpl.makeDataset(tables, sparkSession)
+ } else {
+ val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+ val plan = ShowTables(UnresolvedNamespace(ident), None)
+ val ret = sparkSession.sessionState.executePlan(plan).toRdd.collect()
+ val tables = ret
+ .map(row => ident ++ Seq(row.getString(1)))
+ .map(makeTable)
+ CatalogImpl.makeDataset(tables, sparkSession)
+ }
}
/**
@@ -117,14 +132,45 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
case NonFatal(_) => None
}
val isTemp = sessionCatalog.isTempView(tableIdent)
+ val qualifier =
+ metadata.map(_.identifier.database).getOrElse(tableIdent.database).map(Array(_)).orNull
new Table(
name = tableIdent.table,
- database = metadata.map(_.identifier.database).getOrElse(tableIdent.database).orNull,
+ catalog = CatalogManager.SESSION_CATALOG_NAME,
+ namespace = qualifier,
description = metadata.map(_.comment.orNull).orNull,
tableType = if (isTemp) "TEMPORARY" else metadata.map(_.tableType.name).orNull,
isTemporary = isTemp)
}
+ private def makeTable(ident: Seq[String]): Table = {
+ val plan = UnresolvedTableOrView(ident, "Catalog.listTables", true)
+ val node = sparkSession.sessionState.executePlan(plan).analyzed
+ node match {
+ case t: ResolvedTable =>
+ val isExternal = t.table.properties().getOrDefault(
+ TableCatalog.PROP_EXTERNAL, "false").equals("true")
+ new Table(
+ name = t.identifier.name(),
+ catalog = t.catalog.name(),
+ namespace = t.identifier.namespace(),
+ description = t.table.properties().get("comment"),
+ tableType =
+ if (isExternal) CatalogTableType.EXTERNAL.name
+ else CatalogTableType.MANAGED.name,
+ isTemporary = false)
+ case v: ResolvedView =>
+ new Table(
+ name = v.identifier.name(),
+ catalog = null,
+ namespace = v.identifier.namespace(),
+ description = null,
+ tableType = if (v.isTemp) "TEMPORARY" else "VIEW",
+ isTemporary = v.isTemp)
+ case _ => throw QueryCompilationErrors.tableOrViewNotFound(ident)
+ }
+ }
+
/**
* Returns a list of functions registered in the current database.
* This includes all temporary functions
@@ -367,24 +413,38 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
schema: StructType,
description: String,
options: Map[String, String]): DataFrame = {
- val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+ val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
val storage = DataSource.buildStorageFormatFromOptions(options)
val tableType = if (storage.locationUri.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}
- val tableDesc = CatalogTable(
- identifier = tableIdent,
- tableType = tableType,
- storage = storage,
- schema = schema,
+ val location = if (storage.locationUri.isDefined) {
+ val locationStr = storage.locationUri.get.toString
+ Some(locationStr)
+ } else {
+ None
+ }
+
+ val tableSpec = TableSpec(
+ properties = Map(),
provider = Some(source),
- comment = { if (description.isEmpty) None else Some(description) }
- )
- val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
+ options = options,
+ location = location,
+ comment = { if (description.isEmpty) None else Some(description) },
+ serde = None,
+ external = tableType == CatalogTableType.EXTERNAL)
+
+ val plan = CreateTable(
+ name = UnresolvedDBObjectName(ident, isNamespace = false),
+ tableSchema = schema,
+ partitioning = Seq(),
+ tableSpec = tableSpec,
+ ignoreIfExists = false)
+
sparkSession.sessionState.executePlan(plan).toRdd
- sparkSession.table(tableIdent)
+ sparkSession.table(tableName)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
index 28e82aa14e0..8a635807abb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalog.Table
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint}
+import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
@@ -165,7 +166,8 @@ class GlobalTempViewSuite extends QueryTest with SharedSparkSession {
assert(spark.catalog.tableExists(globalTempDB, "src"))
assert(spark.catalog.getTable(globalTempDB, "src").toString == new Table(
name = "src",
- database = globalTempDB,
+ catalog = CatalogManager.SESSION_CATALOG_NAME,
+ namespace = Array(globalTempDB),
description = null,
tableType = "TEMPORARY",
isTemporary = true).toString)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index edd96e83379..06db60676ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.internal
import java.io.File
+import org.scalatest.BeforeAndAfter
+
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalog.{Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, ScalaReflection, TableIdentifier}
@@ -26,6 +28,9 @@ import org.apache.spark.sql.catalyst.analysis.AnalysisTest
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.Range
+import org.apache.spark.sql.connector.FakeV2Provider
+import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.StorageLevel
@@ -34,7 +39,7 @@ import org.apache.spark.storage.StorageLevel
/**
* Tests for the user-facing [[org.apache.spark.sql.catalog.Catalog]].
*/
-class CatalogSuite extends SharedSparkSession with AnalysisTest {
+class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAfter {
import testImplicits._
private def sessionCatalog: SessionCatalog = spark.sessionState.catalog
@@ -114,6 +119,15 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest {
}
}
+ before {
+ spark.conf.set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName)
+ }
+
+ after {
+ spark.sessionState.catalogManager.reset()
+ spark.sessionState.conf.clear()
+ }
+
test("current database") {
assert(spark.catalog.currentDatabase == "default")
assert(sessionCatalog.getCurrentDatabase == "default")
@@ -261,8 +275,8 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest {
}
test("Table.toString") {
- assert(new Table("volley", "databasa", "one", "world", isTemporary = true).toString ==
- "Table[name='volley', database='databasa', description='one', " +
+ assert(new Table("volley", null, Array("databasa"), "one", "world", isTemporary = true).toString
+ == "Table[name='volley', database='databasa', description='one', " +
"tableType='world', isTemporary='true']")
assert(new Table("volley", null, null, "world", isTemporary = true).toString ==
"Table[name='volley', tableType='world', isTemporary='true']")
@@ -290,7 +304,8 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest {
test("catalog classes format in Dataset.show") {
val db = new Database("nama", "descripta", "locata")
- val table = new Table("nama", "databasa", "descripta", "typa", isTemporary = false)
+ val table = new Table("nama", "cataloa", Array("databasa"), "descripta", "typa",
+ isTemporary = false)
val function = new Function("nama", "databasa", "descripta", "classa", isTemporary = false)
val column = new Column(
"nama", "descripta", "typa", nullable = false, isPartition = true, isBucket = true)
@@ -299,7 +314,9 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest {
val functionFields = ScalaReflection.getConstructorParameterValues(function)
val columnFields = ScalaReflection.getConstructorParameterValues(column)
assert(dbFields == Seq("nama", "descripta", "locata"))
- assert(tableFields == Seq("nama", "databasa", "descripta", "typa", false))
+ assert(Seq(tableFields(0), tableFields(1), tableFields(3), tableFields(4), tableFields(5)) ==
+ Seq("nama", "cataloa", "descripta", "typa", false))
+ assert(tableFields(2).asInstanceOf[Array[String]].sameElements(Array("databasa")))
assert(functionFields == Seq("nama", "databasa", "descripta", "classa", false))
assert(columnFields == Seq("nama", "descripta", "typa", false, true, true))
val dbString = CatalogImpl.makeDataset(Seq(db), spark).showString(10)
@@ -307,7 +324,8 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest {
val functionString = CatalogImpl.makeDataset(Seq(function), spark).showString(10)
val columnString = CatalogImpl.makeDataset(Seq(column), spark).showString(10)
dbFields.foreach { f => assert(dbString.contains(f.toString)) }
- tableFields.foreach { f => assert(tableString.contains(f.toString)) }
+ tableFields.foreach { f => assert(tableString.contains(f.toString) ||
+ tableString.contains(f.asInstanceOf[Array[String]].mkString(""))) }
functionFields.foreach { f => assert(functionString.contains(f.toString)) }
columnFields.foreach { f => assert(columnString.contains(f.toString)) }
}
@@ -553,4 +571,114 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest {
}.getMessage
assert(errMsg.contains("my_temp_table is a temp view. 'recoverPartitions()' expects a table"))
}
+
+ test("three layer namespace compatibility - create managed table") {
+ val catalogName = "testcat"
+ val dbName = "my_db"
+ val tableName = "my_table"
+ val tableSchema = new StructType().add("i", "int")
+ val description = "this is a test table"
+
+ val df = spark.catalog.createTable(
+ tableName = Array(catalogName, dbName, tableName).mkString("."),
+ source = classOf[FakeV2Provider].getName,
+ schema = tableSchema,
+ description = description,
+ options = Map.empty[String, String])
+ assert(df.schema.equals(tableSchema))
+
+ val testCatalog =
+ spark.sessionState.catalogManager.catalog(catalogName).asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array(dbName), tableName))
+ assert(table.schema().equals(tableSchema))
+ assert(table.properties().get("provider").equals(classOf[FakeV2Provider].getName))
+ assert(table.properties().get("comment").equals(description))
+ }
+
+ test("three layer namespace compatibility - create external table") {
+ withTempDir { dir =>
+ val catalogName = "testcat"
+ val dbName = "my_db"
+ val tableName = "my_table"
+ val tableSchema = new StructType().add("i", "int")
+ val description = "this is a test table"
+
+ val df = spark.catalog.createTable(
+ tableName = Array(catalogName, dbName, tableName).mkString("."),
+ source = classOf[FakeV2Provider].getName,
+ schema = tableSchema,
+ description = description,
+ options = Map("path" -> dir.getAbsolutePath))
+ assert(df.schema.equals(tableSchema))
+
+ val testCatalog =
+ spark.sessionState.catalogManager.catalog("testcat").asTableCatalog
+ val table = testCatalog.loadTable(Identifier.of(Array(dbName), tableName))
+ assert(table.schema().equals(tableSchema))
+ assert(table.properties().get("provider").equals(classOf[FakeV2Provider].getName))
+ assert(table.properties().get("comment").equals(description))
+ assert(table.properties().get("path").equals(dir.getAbsolutePath))
+ assert(table.properties().get("external").equals("true"))
+ assert(table.properties().get("location").equals("file:" + dir.getAbsolutePath))
+ }
+ }
+
+ test("three layer namespace compatibility - list tables") {
+ withTempDir { dir =>
+ val catalogName = "testcat"
+ val dbName = "my_db"
+ val tableName = "my_table"
+ val tableSchema = new StructType().add("i", "int")
+ val description = "this is a test managed table"
+
+ spark.catalog.createTable(
+ tableName = Array(catalogName, dbName, tableName).mkString("."),
+ source = classOf[FakeV2Provider].getName,
+ schema = tableSchema,
+ description = description,
+ options = Map.empty[String, String])
+
+ val tableName2 = "my_table2"
+ val description2 = "this is a test external table"
+
+ spark.catalog.createTable(
+ tableName = Array(catalogName, dbName, tableName2).mkString("."),
+ source = classOf[FakeV2Provider].getName,
+ schema = tableSchema,
+ description = description2,
+ options = Map("path" -> dir.getAbsolutePath))
+
+ val tables = spark.catalog.listTables("testcat.my_db").collect()
+ assert(tables.size == 2)
+
+ val expectedTable1 =
+ new Table(tableName, catalogName, Array(dbName), description,
+ CatalogTableType.MANAGED.name, false)
+ assert(tables.exists(t =>
+ expectedTable1.name.equals(t.name) && expectedTable1.database.equals(t.database) &&
+ expectedTable1.description.equals(t.description) &&
+ expectedTable1.tableType.equals(t.tableType) &&
+ expectedTable1.isTemporary == t.isTemporary))
+
+ val expectedTable2 =
+ new Table(tableName2, catalogName, Array(dbName), description2,
+ CatalogTableType.EXTERNAL.name, false)
+ assert(tables.exists(t =>
+ expectedTable2.name.equals(t.name) && expectedTable2.database.equals(t.database) &&
+ expectedTable2.description.equals(t.description) &&
+ expectedTable2.tableType.equals(t.tableType) &&
+ expectedTable2.isTemporary == t.isTemporary))
+ }
+ }
+
+ test("list tables when there is `default` catalog") {
+ spark.conf.set("spark.sql.catalog.default", classOf[InMemoryCatalog].getName)
+
+ assert(spark.catalog.listTables("default").collect().isEmpty)
+ createTable("my_table1")
+ createTable("my_table2")
+ createTempTable("my_temp_table")
+ assert(spark.catalog.listTables("default").collect().map(_.name).toSet ==
+ Set("my_table1", "my_table2", "my_temp_table"))
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 16b5d6cf1bf..1de680260af 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -524,7 +524,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
assert(
intercept[AnalysisException] {
sparkSession.catalog.createTable("createdJsonTable", jsonFilePath.toString)
- }.getMessage.contains("Table createdJsonTable already exists."),
+ }.getMessage.contains("Table default.createdJsonTable already exists."),
"We should complain that createdJsonTable already exists")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index c4cef44b6cc..5ea9a1adaa5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1650,8 +1650,8 @@ class HiveDDLSuite
// Even if index tables exist, listTables and getTable APIs should still work
checkAnswer(
spark.catalog.listTables().toDF(),
- Row(indexTabName, "default", null, null, false) ::
- Row(tabName, "default", null, "MANAGED", false) :: Nil)
+ Row(indexTabName, "spark_catalog", Array("default"), null, null, false) ::
+ Row(tabName, "spark_catalog", Array("default"), null, "MANAGED", false) :: Nil)
assert(spark.catalog.getTable("default", indexTabName).name === indexTabName)
intercept[TableAlreadyExistsException] {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org