You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/08 16:06:56 UTC
[spark] branch master updated: [SPARK-39912][SPARK-39828][SQL] Refine CatalogImpl
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5c9175c9da7 [SPARK-39912][SPARK-39828][SQL] Refine CatalogImpl
5c9175c9da7 is described below
commit 5c9175c9da719536123477d7fcc784a4086fbe25
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Aug 9 00:06:19 2022 +0800
[SPARK-39912][SPARK-39828][SQL] Refine CatalogImpl
### What changes were proposed in this pull request?
`CatalogImpl` has been updated quite a bit recently, to support v2 catalogs. This PR revisits the recent changes and refines the code a little bit:
1. fix the naming "3 layer namespace". The spark catalog plugin supports n-part namespace. This PR changes it to `qualified name with catalog`.
2. always use the v2 code path. Today the v2 code path can already cover all the functionalities of `CatalogImpl` and it's unnecessary to keep the v1 code path in `CatalogImpl`. It also makes sure the behavior is consistent between `db.table` and `spark_catalog.db.table`. Previously it was not consistent in some cases, see the updated tests for functions.
3. simplify `try {v1 code path} catch {... v2 code path}` to `val name = if (table exists in HMS) {name qualified with spark_catalog} else {parsed name}; v2 code path`
### Why are the changes needed?
code cleanup.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes #37287 from cloud-fan/catalog.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
R/pkg/tests/fulltests/test_sparkSQL.R | 5 +-
python/pyspark/sql/catalog.py | 6 +-
python/pyspark/sql/tests/test_catalog.py | 2 +-
.../sql/catalyst/catalog/SessionCatalog.scala | 18 +-
.../org/apache/spark/sql/catalog/Catalog.scala | 102 ++---
.../apache/spark/sql/internal/CatalogImpl.scala | 474 ++++++++++-----------
.../spark/sql/execution/GlobalTempViewSuite.scala | 3 +-
.../apache/spark/sql/internal/CatalogSuite.scala | 81 +++-
8 files changed, 362 insertions(+), 329 deletions(-)
diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R
index fc54d89a1a4..27994ed76b2 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -4098,14 +4098,13 @@ test_that("catalog APIs, listTables, getTable, listColumns, listFunctions, funct
c("name", "description", "dataType", "nullable", "isPartition", "isBucket"))
expect_equal(collect(c)[[1]][[1]], "speed")
expect_error(listColumns("zxwtyswklpf", "default"),
- paste("Error in listColumns : analysis error - Table",
- "'zxwtyswklpf' does not exist in database 'default'"))
+ paste("Table or view not found: spark_catalog.default.zxwtyswklpf"))
f <- listFunctions()
expect_true(nrow(f) >= 200) # 250
expect_equal(colnames(f),
c("name", "catalog", "namespace", "description", "className", "isTemporary"))
- expect_equal(take(orderBy(f, "className"), 1)$className,
+ expect_equal(take(orderBy(filter(f, "className IS NOT NULL"), "className"), 1)$className,
"org.apache.spark.sql.catalyst.expressions.Abs")
expect_error(listFunctions("zxwtyswklpf_db"),
paste("Error in listFunctions : no such database - Database",
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 548750d7120..10c9ab5f6d2 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -164,7 +164,7 @@ class Catalog:
Examples
--------
>>> spark.catalog.getDatabase("default")
- Database(name='default', catalog=None, description='default database', ...
+ Database(name='default', catalog='spark_catalog', description='default database', ...
>>> spark.catalog.getDatabase("spark_catalog.default")
Database(name='default', catalog='spark_catalog', description='default database', ...
"""
@@ -376,9 +376,9 @@ class Catalog:
--------
>>> func = spark.sql("CREATE FUNCTION my_func1 AS 'test.org.apache.spark.sql.MyDoubleAvg'")
>>> spark.catalog.getFunction("my_func1")
- Function(name='my_func1', catalog=None, namespace=['default'], ...
+ Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ...
>>> spark.catalog.getFunction("default.my_func1")
- Function(name='my_func1', catalog=None, namespace=['default'], ...
+ Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ...
>>> spark.catalog.getFunction("spark_catalog.default.my_func1")
Function(name='my_func1', catalog='spark_catalog', namespace=['default'], ...
>>> spark.catalog.getFunction("my_func2")
diff --git a/python/pyspark/sql/tests/test_catalog.py b/python/pyspark/sql/tests/test_catalog.py
index 7d81234bce2..24cd67251a8 100644
--- a/python/pyspark/sql/tests/test_catalog.py
+++ b/python/pyspark/sql/tests/test_catalog.py
@@ -198,7 +198,7 @@ class CatalogTests(ReusedSQLTestCase):
self.assertTrue("to_unix_timestamp" in functions)
self.assertTrue("current_database" in functions)
self.assertEqual(functions["+"].name, "+")
- self.assertEqual(functions["+"].description, None)
+ self.assertEqual(functions["+"].description, "expr1 + expr2 - Returns `expr1`+`expr2`.")
self.assertEqual(
functions["+"].className, "org.apache.spark.sql.catalyst.expressions.Add"
)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 16d89c9b2e4..a0c98aac6c4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -971,13 +971,17 @@ class SessionCatalog(
}
def lookupTempView(name: TableIdentifier): Option[View] = {
- val tableName = formatTableName(name.table)
- if (name.database.isEmpty) {
- tempViews.get(tableName).map(getTempViewPlan)
- } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
- globalTempViewManager.get(tableName).map(getTempViewPlan)
- } else {
- None
+ lookupLocalOrGlobalRawTempView(name.database.toSeq :+ name.table).map(getTempViewPlan)
+ }
+
+ /**
+ * Return the raw logical plan of a temporary local or global view for the given name.
+ */
+ def lookupLocalOrGlobalRawTempView(name: Seq[String]): Option[TemporaryViewRelation] = {
+ name match {
+ case Seq(v) => getRawTempView(v)
+ case Seq(db, v) if isGlobalTempViewDB(db) => getRawGlobalTempView(v)
+ case _ => None
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 29b35229e97..82ac8fd6049 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -33,28 +33,28 @@ import org.apache.spark.storage.StorageLevel
abstract class Catalog {
/**
- * Returns the current default database in this session.
+ * Returns the current database (namespace) in this session.
*
* @since 2.0.0
*/
def currentDatabase: String
/**
- * Sets the current default database in this session.
+ * Sets the current database (namespace) in this session.
*
* @since 2.0.0
*/
def setCurrentDatabase(dbName: String): Unit
/**
- * Returns a list of databases available across all sessions.
+ * Returns a list of databases (namespaces) available within the current catalog.
*
* @since 2.0.0
*/
def listDatabases(): Dataset[Database]
/**
- * Returns a list of tables/views in the current database.
+ * Returns a list of tables/views in the current database (namespace).
* This includes all temporary views.
*
* @since 2.0.0
@@ -62,7 +62,8 @@ abstract class Catalog {
def listTables(): Dataset[Table]
/**
- * Returns a list of tables/views in the specified database.
+ * Returns a list of tables/views in the specified database (namespace) (the name can be qualified
+ * with catalog).
* This includes all temporary views.
*
* @since 2.0.0
@@ -71,16 +72,17 @@ abstract class Catalog {
def listTables(dbName: String): Dataset[Table]
/**
- * Returns a list of functions registered in the current database.
- * This includes all temporary functions
+ * Returns a list of functions registered in the current database (namespace).
+ * This includes all temporary functions.
*
* @since 2.0.0
*/
def listFunctions(): Dataset[Function]
/**
- * Returns a list of functions registered in the specified database.
- * This includes all temporary functions
+ * Returns a list of functions registered in the specified database (namespace) (the name can be
+ * qualified with catalog).
+ * This includes all built-in and temporary functions.
*
* @since 2.0.0
*/
@@ -90,21 +92,22 @@ abstract class Catalog {
/**
* Returns a list of columns for the given table/view or temporary view.
*
- * @param tableName is either a qualified or unqualified name that designates a table/view.
- * If no database identifier is provided, it refers to a temporary view or
- * a table/view in the current database.
+ * @param tableName is either a qualified or unqualified name that designates a table/view. It
+ * follows the same resolution rule with SQL: search for temp views first then
+ * table/views in the current database (namespace).
* @since 2.0.0
*/
@throws[AnalysisException]("table does not exist")
def listColumns(tableName: String): Dataset[Column]
/**
- * Returns a list of columns for the given table/view in the specified database.
+ * Returns a list of columns for the given table/view in the specified database under the Hive
+ * Metastore.
*
- * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace,
- * use listColumns(tableName) instead.
+ * To list columns for table/view in other catalogs, please use `listColumns(tableName)` with
+ * qualified table/view name instead.
*
- * @param dbName is a name that designates a database.
+ * @param dbName is an unqualified name that designates a database.
* @param tableName is an unqualified name that designates a table/view.
* @since 2.0.0
*/
@@ -112,8 +115,8 @@ abstract class Catalog {
def listColumns(dbName: String, tableName: String): Dataset[Column]
/**
- * Get the database with the specified name. This throws an AnalysisException when the database
- * cannot be found.
+ * Get the database (namespace) with the specified name (can be qualified with catalog). This
+ * throws an AnalysisException when the database (namespace) cannot be found.
*
* @since 2.1.0
*/
@@ -124,20 +127,20 @@ abstract class Catalog {
* Get the table or view with the specified name. This table can be a temporary view or a
* table/view. This throws an AnalysisException when no Table can be found.
*
- * @param tableName is either a qualified or unqualified name that designates a table/view.
- * If no database identifier is provided, it refers to a table/view in
- * the current database.
+ * @param tableName is either a qualified or unqualified name that designates a table/view. It
+ * follows the same resolution rule with SQL: search for temp views first then
+ * table/views in the current database (namespace).
* @since 2.1.0
*/
@throws[AnalysisException]("table does not exist")
def getTable(tableName: String): Table
/**
- * Get the table or view with the specified name in the specified database. This throws an
- * AnalysisException when no Table can be found.
+ * Get the table or view with the specified name in the specified database under the Hive
+ * Metastore. This throws an AnalysisException when no Table can be found.
*
- * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace,
- * use getTable(tableName) instead.
+ * To get table/view in other catalogs, please use `getTable(tableName)` with qualified table/view
+ * name instead.
*
* @since 2.1.0
*/
@@ -148,22 +151,22 @@ abstract class Catalog {
* Get the function with the specified name. This function can be a temporary function or a
* function. This throws an AnalysisException when the function cannot be found.
*
- * @param functionName is either a qualified or unqualified name that designates a function.
- * If no database identifier is provided, it refers to a temporary function
- * or a function in the current database.
+ * @param functionName is either a qualified or unqualified name that designates a function. It
+ * follows the same resolution rule with SQL: search for built-in/temp
+ * functions first then functions in the current database (namespace).
* @since 2.1.0
*/
@throws[AnalysisException]("function does not exist")
def getFunction(functionName: String): Function
/**
- * Get the function with the specified name. This throws an AnalysisException when the function
- * cannot be found.
+ * Get the function with the specified name in the specified database under the Hive Metastore.
+ * This throws an AnalysisException when the function cannot be found.
*
- * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace,
- * use getFunction(functionName) instead.
+ * To get functions in other catalogs, please use `getFunction(functionName)` with qualified
+ * function name instead.
*
- * @param dbName is a name that designates a database.
+ * @param dbName is an unqualified name that designates a database.
* @param functionName is an unqualified name that designates a function in the specified database
* @since 2.1.0
*/
@@ -171,7 +174,8 @@ abstract class Catalog {
def getFunction(dbName: String, functionName: String): Function
/**
- * Check if the database with the specified name exists.
+ * Check if the database (namespace) with the specified name exists (the name can be qualified
+ * with catalog).
*
* @since 2.1.0
*/
@@ -181,20 +185,21 @@ abstract class Catalog {
* Check if the table or view with the specified name exists. This can either be a temporary
* view or a table/view.
*
- * @param tableName is either a qualified or unqualified name that designates a table/view.
- * If no database identifier is provided, it refers to a table/view in
- * the current database.
+ * @param tableName is either a qualified or unqualified name that designates a table/view. It
+ * follows the same resolution rule with SQL: search for temp views first then
+ * table/views in the current database (namespace).
* @since 2.1.0
*/
def tableExists(tableName: String): Boolean
/**
- * Check if the table or view with the specified name exists in the specified database.
+ * Check if the table or view with the specified name exists in the specified database under the
+ * Hive Metastore.
*
- * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace,
- * use tableExists(tableName) instead.
+ * To check existence of table/view in other catalogs, please use `tableExists(tableName)` with
+ * qualified table/view name instead.
*
- * @param dbName is a name that designates a database.
+ * @param dbName is an unqualified name that designates a database.
* @param tableName is an unqualified name that designates a table.
* @since 2.1.0
*/
@@ -204,20 +209,21 @@ abstract class Catalog {
* Check if the function with the specified name exists. This can either be a temporary function
* or a function.
*
- * @param functionName is either a qualified or unqualified name that designates a function.
- * If no database identifier is provided, it refers to a function in
- * the current database.
+ * @param functionName is either a qualified or unqualified name that designates a function. It
+ * follows the same resolution rule with SQL: search for built-in/temp
+ * functions first then functions in the current database (namespace).
* @since 2.1.0
*/
def functionExists(functionName: String): Boolean
/**
- * Check if the function with the specified name exists in the specified database.
+ * Check if the function with the specified name exists in the specified database under the
+ * Hive Metastore.
*
- * This API does not support 3 layer namespace since 3.4.0. To use 3 layer namespace,
- * use functionExists(functionName) instead.
+ * To check existence of functions in other catalogs, please use `functionExists(functionName)`
+ * with qualified function name instead.
*
- * @param dbName is a name that designates a database.
+ * @param dbName is an unqualified name that designates a database.
* @param functionName is an unqualified name that designates a function.
* @since 2.1.0
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index e11b349777e..657ed87e609 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -23,14 +23,14 @@ import scala.util.control.NonFatal
import org.apache.spark.sql._
import org.apache.spark.sql.catalog.{Catalog, CatalogMetadata, Column, Database, Function, Table}
import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedNonPersistentFunc, ResolvedPersistentFunc, ResolvedTable, ResolvedView, UnresolvedFunc, UnresolvedIdentifier, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
+import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, ResolvedNamespace, ResolvedNonPersistentFunc, ResolvedPersistentFunc, ResolvedTable, ResolvedView, UnresolvedFunc, UnresolvedIdentifier, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, SubqueryAlias, TableSpec, View}
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, SupportsNamespaces, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, CatalogV2Util, FunctionCatalog, Identifier, SupportsNamespaces, Table => V2Table, TableCatalog, V1Table}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, IdentifierHelper, MultipartIdentifierHelper, TransformHelper}
import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.connector.V1Function
@@ -45,15 +45,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
private def sessionCatalog: SessionCatalog = sparkSession.sessionState.catalog
- private def requireDatabaseExists(dbName: String): Unit = {
- if (!sessionCatalog.databaseExists(dbName)) {
- throw QueryCompilationErrors.databaseDoesNotExistError(dbName)
- }
+ private def parseIdent(name: String): Seq[String] = {
+ sparkSession.sessionState.sqlParser.parseMultipartIdentifier(name)
}
- private def requireTableExists(dbName: String, tableName: String): Unit = {
- if (!sessionCatalog.tableExists(TableIdentifier(tableName, Some(dbName)))) {
- throw QueryCompilationErrors.tableDoesNotExistInDatabaseError(tableName, dbName)
+ private def qualifyV1Ident(nameParts: Seq[String]): Seq[String] = {
+ assert(nameParts.length == 1 || nameParts.length == 2)
+ if (nameParts.length == 1) {
+ Seq(CatalogManager.SESSION_CATALOG_NAME, sessionCatalog.getCurrentDatabase) ++ nameParts
+ } else {
+ CatalogManager.SESSION_CATALOG_NAME +: nameParts
}
}
@@ -68,32 +69,27 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
@throws[AnalysisException]("database does not exist")
override def setCurrentDatabase(dbName: String): Unit = {
- // we assume dbName will not include the catalog prefix. e.g. if you call
- // setCurrentDatabase("catalog.db") it will search for a database catalog.db in the catalog.
- val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
- sparkSession.sessionState.catalogManager.setCurrentNamespace(ident.toArray)
+ // we assume `dbName` will not include the catalog name. e.g. if you call
+ // `setCurrentDatabase("catalog.db")`, it will search for a database 'catalog.db' in the current
+ // catalog.
+ sparkSession.sessionState.catalogManager.setCurrentNamespace(parseIdent(dbName).toArray)
}
/**
* Returns a list of databases available across all sessions.
*/
override def listDatabases(): Dataset[Database] = {
- val catalog = currentCatalog()
- val plan = ShowNamespaces(UnresolvedNamespace(Seq(catalog)), None)
- val databases = sparkSession.sessionState.executePlan(plan).toRdd.collect()
- .map(row => catalog + "." + row.getString(0))
- .map(getDatabase)
+ val plan = ShowNamespaces(UnresolvedNamespace(Nil), None)
+ val qe = sparkSession.sessionState.executePlan(plan)
+ val catalog = qe.analyzed.collectFirst {
+ case ShowNamespaces(r: ResolvedNamespace, _, _) => r.catalog
+ }.get
+ val databases = qe.toRdd.collect().map { row =>
+ getNamespace(catalog, parseIdent(row.getString(0)))
+ }
CatalogImpl.makeDataset(databases, sparkSession)
}
- private def makeDatabase(dbName: String): Database = {
- val metadata = sessionCatalog.getDatabaseMetadata(dbName)
- new Database(
- name = metadata.name,
- description = metadata.description,
- locationUri = CatalogUtils.URIToString(metadata.locationUri))
- }
-
/**
* Returns a list of tables in the current database.
* This includes all temporary tables.
@@ -110,74 +106,93 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
override def listTables(dbName: String): Dataset[Table] = {
// `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or
// a qualified namespace with catalog name. We assume it's a single database name
- // and check if we can find the dbName in sessionCatalog. If so we listTables under
- // that database. Otherwise we try 3-part name parsing and locate the database.
- if (sessionCatalog.databaseExists(dbName) || sessionCatalog.isGlobalTempViewDB(dbName)) {
- val tables = sessionCatalog.listTables(dbName).map(makeTable)
- CatalogImpl.makeDataset(tables, sparkSession)
+ // and check if we can find it in the sessionCatalog. If so we list tables under
+ // that database. Otherwise we will resolve the catalog/namespace and list tables there.
+ val namespace = if (sessionCatalog.databaseExists(dbName)) {
+ Seq(CatalogManager.SESSION_CATALOG_NAME, dbName)
} else {
- val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
- val plan = ShowTables(UnresolvedNamespace(ident), None)
- val ret = sparkSession.sessionState.executePlan(plan).toRdd.collect()
- val tables = ret
- .map(row => ident ++ Seq(row.getString(1)))
- .map(makeTable)
- CatalogImpl.makeDataset(tables, sparkSession)
+ parseIdent(dbName)
}
- }
+ val plan = ShowTables(UnresolvedNamespace(namespace), None)
+ val qe = sparkSession.sessionState.executePlan(plan)
+ val catalog = qe.analyzed.collectFirst {
+ case ShowTables(r: ResolvedNamespace, _, _) => r.catalog
+ case _: ShowTablesCommand =>
+ sparkSession.sessionState.catalogManager.v2SessionCatalog
+ }.get
+ val tables = qe.toRdd.collect().map { row =>
+ val tableName = row.getString(1)
+ val namespaceName = row.getString(0)
+ val isTemp = row.getBoolean(2)
+ if (isTemp) {
+ // Temp views do not belong to any catalog. We shouldn't prepend the catalog name here.
+ val ns = if (namespaceName.isEmpty) Nil else Seq(namespaceName)
+ makeTable(ns :+ tableName)
+ } else {
+ val ns = parseIdent(namespaceName)
+ makeTable(catalog.name() +: ns :+ tableName)
+ }
+ }
+ CatalogImpl.makeDataset(tables, sparkSession)
+ }
+
+ private def makeTable(nameParts: Seq[String]): Table = {
+ sessionCatalog.lookupLocalOrGlobalRawTempView(nameParts).map { tempView =>
+ new Table(
+ name = tempView.tableMeta.identifier.table,
+ catalog = null,
+ namespace = tempView.tableMeta.identifier.database.toArray,
+ description = tempView.tableMeta.comment.orNull,
+ tableType = "TEMPORARY",
+ isTemporary = true)
+ }.getOrElse {
+ val plan = UnresolvedIdentifier(nameParts)
+ sparkSession.sessionState.executePlan(plan).analyzed match {
+ case ResolvedIdentifier(catalog: TableCatalog, ident) =>
+ val tableOpt = try {
+ loadTable(catalog, ident)
+ } catch {
+ // Even if the table exits, error may still happen. For example, Spark can't read Hive's
+ // index table. We return a Table without description and tableType in this case.
+ case NonFatal(_) =>
+ Some(new Table(
+ name = ident.name(),
+ catalog = catalog.name(),
+ namespace = ident.namespace(),
+ description = null,
+ tableType = null,
+ isTemporary = false))
+ }
+ tableOpt.getOrElse(throw QueryCompilationErrors.tableOrViewNotFound(nameParts))
- /**
- * Returns a Table for the given table/view or temporary view.
- *
- * Note that this function requires the table already exists in the Catalog.
- *
- * If the table metadata retrieval failed due to any reason (e.g., table serde class
- * is not accessible or the table type is not accepted by Spark SQL), this function
- * still returns the corresponding Table without the description and tableType)
- */
- private def makeTable(tableIdent: TableIdentifier): Table = {
- val metadata = try {
- Some(sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent))
- } catch {
- case NonFatal(_) => None
+ case _ => throw QueryCompilationErrors.tableOrViewNotFound(nameParts)
+ }
}
- val isTemp = sessionCatalog.isTempView(tableIdent)
- val qualifier =
- metadata.map(_.identifier.database).getOrElse(tableIdent.database).map(Array(_)).orNull
- new Table(
- name = tableIdent.table,
- catalog = CatalogManager.SESSION_CATALOG_NAME,
- namespace = qualifier,
- description = metadata.map(_.comment.orNull).orNull,
- tableType = if (isTemp) "TEMPORARY" else metadata.map(_.tableType.name).orNull,
- isTemporary = isTemp)
- }
-
- private def makeTable(ident: Seq[String]): Table = {
- val plan = UnresolvedTableOrView(ident, "Catalog.listTables", true)
- val node = sparkSession.sessionState.executePlan(plan).analyzed
- node match {
- case t: ResolvedTable =>
- val isExternal = t.table.properties().getOrDefault(
+ }
+
+ private def loadTable(catalog: TableCatalog, ident: Identifier): Option[Table] = {
+ // TODO: support v2 view when it gets implemented.
+ CatalogV2Util.loadTable(catalog, ident).map {
+ case v1: V1Table if v1.v1Table.tableType == CatalogTableType.VIEW =>
+ new Table(
+ name = v1.v1Table.identifier.table,
+ catalog = catalog.name(),
+ namespace = v1.v1Table.identifier.database.toArray,
+ description = v1.v1Table.comment.orNull,
+ tableType = "VIEW",
+ isTemporary = false)
+ case t: V2Table =>
+ val isExternal = t.properties().getOrDefault(
TableCatalog.PROP_EXTERNAL, "false").equals("true")
new Table(
- name = t.identifier.name(),
- catalog = t.catalog.name(),
- namespace = t.identifier.namespace(),
- description = t.table.properties().get("comment"),
+ name = ident.name(),
+ catalog = catalog.name(),
+ namespace = ident.namespace(),
+ description = t.properties().get("comment"),
tableType =
if (isExternal) CatalogTableType.EXTERNAL.name
else CatalogTableType.MANAGED.name,
isTemporary = false)
- case v: ResolvedView =>
- new Table(
- name = v.identifier.name(),
- catalog = null,
- namespace = v.identifier.namespace(),
- description = null,
- tableType = if (v.isTemp) "TEMPORARY" else "VIEW",
- isTemporary = v.isTemp)
- case _ => throw QueryCompilationErrors.tableOrViewNotFound(ident)
}
}
@@ -197,48 +212,37 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
override def listFunctions(dbName: String): Dataset[Function] = {
// `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or
// a qualified namespace with catalog name. We assume it's a single database name
- // and check if we can find the dbName in sessionCatalog. If so we listFunctions under
- // that database. Otherwise we try 3-part name parsing and locate the database.
- if (sessionCatalog.databaseExists(dbName)) {
- val functions = sessionCatalog.listFunctions(dbName)
- .map { case (functIdent, _) => makeFunction(functIdent) }
- CatalogImpl.makeDataset(functions, sparkSession)
+ // and check if we can find it in the sessionCatalog. If so we list functions under
+ // that database. Otherwise we will resolve the catalog/namespace and list functions there.
+ val namespace = if (sessionCatalog.databaseExists(dbName)) {
+ Seq(CatalogManager.SESSION_CATALOG_NAME, dbName)
} else {
- val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
- val functions = collection.mutable.ArrayBuilder.make[Function]
-
- // built-in functions
- val plan0 = ShowFunctions(UnresolvedNamespace(ident),
- userScope = false, systemScope = true, None)
- sparkSession.sessionState.executePlan(plan0).toRdd.collect().foreach { row =>
- // `lookupBuiltinOrTempFunction` and `lookupBuiltinOrTempTableFunction` in Analyzer
- // require the input identifier only contains the function name, otherwise, built-in
- // functions will be skipped.
- val name = row.getString(0)
- functions += makeFunction(Seq(name))
- }
-
- // user functions
- val plan1 = ShowFunctions(UnresolvedNamespace(ident),
- userScope = true, systemScope = false, None)
- sparkSession.sessionState.executePlan(plan1).toRdd.collect().foreach { row =>
- // `row.getString(0)` may contain dbName like `db.function`, so extract the function name.
- val name = row.getString(0).split("\\.").last
- functions += makeFunction(ident :+ name)
- }
+ parseIdent(dbName)
+ }
+ val functions = collection.mutable.ArrayBuilder.make[Function]
+
+ // TODO: The SHOW FUNCTIONS should tell us the function type (built-in, temp, persistent) and
+ // we can simply the code below quite a bit. For now we need to list built-in functions
+ // separately as several built-in function names are not parsable, such as `!=`.
+
+ // List built-in functions. We don't need to specify the namespace here as SHOW FUNCTIONS with
+ // only system scope does not need to know the catalog and namespace.
+ val plan0 = ShowFunctions(UnresolvedNamespace(Nil), userScope = false, systemScope = true, None)
+ sparkSession.sessionState.executePlan(plan0).toRdd.collect().foreach { row =>
+ // Built-in functions do not belong to any catalog or namespace. We can only look it up with
+ // a single part name.
+ val name = row.getString(0)
+ functions += makeFunction(Seq(name))
+ }
- CatalogImpl.makeDataset(functions.result(), sparkSession)
+ // List user functions.
+ val plan1 = ShowFunctions(UnresolvedNamespace(namespace),
+ userScope = true, systemScope = false, None)
+ sparkSession.sessionState.executePlan(plan1).toRdd.collect().foreach { row =>
+ functions += makeFunction(parseIdent(row.getString(0)))
}
- }
- private def makeFunction(funcIdent: FunctionIdentifier): Function = {
- val metadata = sessionCatalog.lookupFunctionInfo(funcIdent)
- new Function(
- name = metadata.getName,
- database = metadata.getDb,
- description = null, // for now, this is always undefined
- className = metadata.getClassName,
- isTemporary = metadata.getDb == null)
+ CatalogImpl.makeDataset(functions.result(), sparkSession)
}
private def makeFunction(ident: Seq[String]): Function = {
@@ -279,23 +283,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
@throws[AnalysisException]("table does not exist")
override def listColumns(tableName: String): Dataset[Column] = {
- // calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name
- // and optionally contains a database name(thus a TableIdentifier), then we look up the table in
- // sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of
- // string as the qualified identifier and resolve the table through SQL analyzer.
- try {
- val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
- if (tableExists(ident.database.orNull, ident.table)) {
- listColumns(ident)
- } else {
- val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
- listColumns(ident)
- }
- } catch {
- case e: org.apache.spark.sql.catalyst.parser.ParseException =>
- val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
- listColumns(ident)
+ val parsed = parseIdent(tableName)
+ // For backward compatibility (Spark 3.3 and prior), we should check if the table exists in
+ // the Hive Metastore first.
+ val nameParts = if (parsed.length <= 2 && !sessionCatalog.isTempView(parsed) &&
+ sessionCatalog.tableExists(parsed.asTableIdentifier)) {
+ qualifyV1Ident(parsed)
+ } else {
+ parsed
}
+ listColumns(nameParts)
}
/**
@@ -303,25 +300,9 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
@throws[AnalysisException]("database or table does not exist")
override def listColumns(dbName: String, tableName: String): Dataset[Column] = {
- requireTableExists(dbName, tableName)
- listColumns(TableIdentifier(tableName, Some(dbName)))
- }
-
- private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = {
- val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier)
-
- val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
- val bucketColumnNames = tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet
- val columns = tableMetadata.schema.map { c =>
- new Column(
- name = c.name,
- description = c.getComment().orNull,
- dataType = CharVarcharUtils.getRawType(c.metadata).getOrElse(c.dataType).catalogString,
- nullable = c.nullable,
- isPartition = partitionColumnNames.contains(c.name),
- isBucket = bucketColumnNames.contains(c.name))
- }
- CatalogImpl.makeDataset(columns, sparkSession)
+ // For backward compatibility (Spark 3.3 and prior), here we always look up the table from the
+ // Hive Metastore.
+ listColumns(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, tableName))
}
private def listColumns(ident: Seq[String]): Dataset[Column] = {
@@ -361,41 +342,44 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
CatalogImpl.makeDataset(columns, sparkSession)
}
+ private def getNamespace(catalog: CatalogPlugin, ns: Seq[String]): Database = catalog match {
+ case catalog: SupportsNamespaces =>
+ val metadata = catalog.loadNamespaceMetadata(ns.toArray)
+ new Database(
+ name = ns.quoted,
+ catalog = catalog.name,
+ description = metadata.get(SupportsNamespaces.PROP_COMMENT),
+ locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION))
+ // If the catalog doesn't support namespaces, we assume it's an implicit namespace, which always
+ // exists but has no metadata.
+ case catalog: CatalogPlugin =>
+ new Database(
+ name = ns.quoted,
+ catalog = catalog.name,
+ description = null,
+ locationUri = null)
+ case _ => new Database(name = ns.quoted, description = null, locationUri = null)
+ }
/**
* Gets the database with the specified name. This throws an `AnalysisException` when no
* `Database` can be found.
*/
override def getDatabase(dbName: String): Database = {
- // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or a
- // qualified namespace with catalog name. To maintain backwards compatibility, we first assume
- // it's a single database name and return the database from sessionCatalog if it exists.
- // Otherwise we try 3-part name parsing and locate the database. If the parased identifier
- // contains both catalog name and database name, we then search the database in the catalog.
- if (sessionCatalog.databaseExists(dbName) || sessionCatalog.isGlobalTempViewDB(dbName)) {
- makeDatabase(dbName)
+ // `dbName` could be either a single database name (behavior in Spark 3.3 and prior) or
+ // a qualified namespace with catalog name. We assume it's a single database name
+ // and check if we can find it in the sessionCatalog. Otherwise we will parse `dbName` and
+ // resolve catalog/namespace with it.
+ val namespace = if (sessionCatalog.databaseExists(dbName)) {
+ Seq(CatalogManager.SESSION_CATALOG_NAME, dbName)
} else {
- val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
- val plan = UnresolvedNamespace(ident)
- val resolved = sparkSession.sessionState.executePlan(plan).analyzed
- resolved match {
- case ResolvedNamespace(catalog: SupportsNamespaces, namespace) =>
- val metadata = catalog.loadNamespaceMetadata(namespace.toArray)
- new Database(
- name = namespace.quoted,
- catalog = catalog.name,
- description = metadata.get(SupportsNamespaces.PROP_COMMENT),
- locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION))
- // similar to databaseExists: if the catalog doesn't support namespaces, we assume it's an
- // implicit namespace, which exists but has no metadata.
- case ResolvedNamespace(catalog: CatalogPlugin, namespace) =>
- new Database(
- name = namespace.quoted,
- catalog = catalog.name,
- description = null,
- locationUri = null)
- case _ => new Database(name = dbName, description = null, locationUri = null)
- }
+ sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+ }
+ val plan = UnresolvedNamespace(namespace)
+ sparkSession.sessionState.executePlan(plan).analyzed match {
+ case ResolvedNamespace(catalog, namespace) =>
+ getNamespace(catalog, namespace)
+ case _ => new Database(name = dbName, description = null, locationUri = null)
}
}
@@ -404,26 +388,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* table/view. This throws an `AnalysisException` when no `Table` can be found.
*/
override def getTable(tableName: String): Table = {
- // calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name
- // and optionally contains a database name(thus a TableIdentifier), then we look up the table in
- // sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of
- // string as the qualified identifier and resolve the table through SQL analyzer.
- try {
- val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
- if (tableExists(ident.database.orNull, ident.table)) {
- makeTable(ident)
- } else {
- getTable3LNamespace(tableName)
- }
- } catch {
- case e: org.apache.spark.sql.catalyst.parser.ParseException =>
- getTable3LNamespace(tableName)
+ val parsed = parseIdent(tableName)
+ // For backward compatibility (Spark 3.3 and prior), we should check if the table exists in
+ // the Hive Metastore first.
+ val nameParts = if (parsed.length <= 2 && !sessionCatalog.isTempView(parsed) &&
+ sessionCatalog.tableExists(parsed.asTableIdentifier)) {
+ qualifyV1Ident(parsed)
+ } else {
+ parsed
}
- }
-
- private def getTable3LNamespace(tableName: String): Table = {
- val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName)
- makeTable(ident)
+ makeTable(nameParts)
}
/**
@@ -431,10 +405,12 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* `AnalysisException` when no `Table` can be found.
*/
override def getTable(dbName: String, tableName: String): Table = {
- if (tableExists(dbName, tableName)) {
- makeTable(TableIdentifier(tableName, Option(dbName)))
+ if (sessionCatalog.isGlobalTempViewDB(dbName)) {
+ makeTable(Seq(dbName, tableName))
} else {
- throw QueryCompilationErrors.tableOrViewNotFoundInDatabaseError(tableName, dbName)
+ // For backward compatibility (Spark 3.3 and prior), here we always look up the table from the
+ // Hive Metastore.
+ makeTable(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, tableName))
}
}
@@ -443,19 +419,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* function. This throws an `AnalysisException` when no `Function` can be found.
*/
override def getFunction(functionName: String): Function = {
- // calling `sqlParser.parseFunctionIdentifier` to parse functionName. If it contains only
- // function name and optionally contains a database name(thus a FunctionIdentifier), then
- // we look up the function in sessionCatalog.
- // Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of string as
- // the qualified identifier and resolve the function through SQL analyzer.
- try {
- val ident = sparkSession.sessionState.sqlParser.parseFunctionIdentifier(functionName)
- getFunction(ident.database.orNull, ident.funcName)
- } catch {
- case e: org.apache.spark.sql.catalyst.parser.ParseException =>
- val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(functionName)
- makeFunction(ident)
+ val parsed = parseIdent(functionName)
+ // For backward compatibility (Spark 3.3 and prior), we should check if the function exists in
+ // the Hive Metastore first.
+ val nameParts = if (parsed.length <= 2 &&
+ !sessionCatalog.isTemporaryFunction(parsed.asFunctionIdentifier) &&
+ sessionCatalog.isPersistentFunction(parsed.asFunctionIdentifier)) {
+ qualifyV1Ident(parsed)
+ } else {
+ parsed
}
+ makeFunction(nameParts)
}
/**
@@ -463,7 +437,9 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* found.
*/
override def getFunction(dbName: String, functionName: String): Function = {
- makeFunction(FunctionIdentifier(functionName, Option(dbName)))
+ // For backward compatibility (Spark 3.3 and prior), here we always look up the function from
+ // the Hive Metastore.
+ makeFunction(Seq(CatalogManager.SESSION_CATALOG_NAME, dbName, functionName))
}
/**
@@ -471,15 +447,13 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
override def databaseExists(dbName: String): Boolean = {
// To maintain backwards compatibility, we first treat the input is a simple dbName and check
- // if sessionCatalog contains it. If no, we try to parse it as 3 part name. If the parased
- // identifier contains both catalog name and database name, we then search the database in the
- // catalog.
+ // if sessionCatalog contains it. If no, we try to parse it, resolve catalog and namespace,
+ // and check if namespace exists in the catalog.
if (!sessionCatalog.databaseExists(dbName)) {
- val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
- val plan = sparkSession.sessionState.executePlan(UnresolvedNamespace(ident)).analyzed
- plan match {
- case ResolvedNamespace(catalog: SupportsNamespaces, _) =>
- catalog.namespaceExists(ident.slice(1, ident.size).toArray)
+ val plan = UnresolvedNamespace(parseIdent(dbName))
+ sparkSession.sessionState.executePlan(plan).analyzed match {
+ case ResolvedNamespace(catalog: SupportsNamespaces, ns) =>
+ catalog.namespaceExists(ns.toArray)
case _ => true
}
} else {
@@ -492,11 +466,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* view or a table/view.
*/
override def tableExists(tableName: String): Boolean = {
- try {
- getTable(tableName)
- true
- } catch {
- case e: AnalysisException => false
+ val parsed = parseIdent(tableName)
+ // For backward compatibility (Spark 3.3 and prior), we should check if the table exists in
+ // the Hive Metastore first. This also checks if it's a temp view.
+ (parsed.length <= 2 && {
+ val v1Ident = parsed.asTableIdentifier
+ sessionCatalog.isTempView(v1Ident) || sessionCatalog.tableExists(v1Ident)
+ }) || {
+ val plan = UnresolvedIdentifier(parsed)
+ sparkSession.sessionState.executePlan(plan).analyzed match {
+ case ResolvedIdentifier(catalog: TableCatalog, ident) => catalog.tableExists(ident)
+ case _ => false
+ }
}
}
@@ -513,22 +494,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* or a function.
*/
override def functionExists(functionName: String): Boolean = {
- try {
- val ident = sparkSession.sessionState.sqlParser.parseFunctionIdentifier(functionName)
- functionExists(ident.database.orNull, ident.funcName)
- } catch {
- case e: org.apache.spark.sql.catalyst.parser.ParseException =>
- try {
- val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(functionName)
- val plan = UnresolvedFunc(ident, "Catalog.functionExists", false, None)
- sparkSession.sessionState.executePlan(plan).analyzed match {
- case _: ResolvedPersistentFunc => true
- case _: ResolvedNonPersistentFunc => true
- case _ => false
- }
- } catch {
- case _: org.apache.spark.sql.AnalysisException => false
- }
+ val parsed = parseIdent(functionName)
+ // For backward compatibility (Spark 3.3 and prior), we should check if the function exists in
+ // the Hive Metastore first. This also checks if it's a built-in/temp function.
+ (parsed.length <= 2 && sessionCatalog.functionExists(parsed.asFunctionIdentifier)) || {
+ val plan = UnresolvedIdentifier(parsed)
+ sparkSession.sessionState.executePlan(plan).analyzed match {
+ case ResolvedIdentifier(catalog: FunctionCatalog, ident) => catalog.functionExists(ident)
+ case _ => false
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
index 8a635807abb..1a9baecfa74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
@@ -21,7 +21,6 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.catalog.Table
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, HintInfo, Join, JoinHint}
-import org.apache.spark.sql.connector.catalog.CatalogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
@@ -166,7 +165,7 @@ class GlobalTempViewSuite extends QueryTest with SharedSparkSession {
assert(spark.catalog.tableExists(globalTempDB, "src"))
assert(spark.catalog.getTable(globalTempDB, "src").toString == new Table(
name = "src",
- catalog = CatalogManager.SESSION_CATALOG_NAME,
+ catalog = null,
namespace = Array(globalTempDB),
description = null,
tableType = "TEMPORARY",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 0de48325d98..ab26a4fcc35 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -182,6 +182,31 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(spark.catalog.listTables().collect().map(_.name).toSet == Set("my_table2"))
}
+ test("SPARK-39828: Catalog.listTables() should respect currentCatalog") {
+ assert(spark.catalog.currentCatalog() == "spark_catalog")
+ assert(spark.catalog.listTables().collect().isEmpty)
+ createTable("my_table1")
+ assert(spark.catalog.listTables().collect().map(_.name).toSet == Set("my_table1"))
+
+ val catalogName = "testcat"
+ val dbName = "my_db"
+ val tableName = "my_table2"
+ val tableSchema = new StructType().add("i", "int")
+ val description = "this is a test managed table"
+ sql(s"CREATE NAMESPACE $catalogName.$dbName")
+
+ spark.catalog.setCurrentCatalog("testcat")
+ spark.catalog.setCurrentDatabase("my_db")
+ assert(spark.catalog.listTables().collect().isEmpty)
+
+ createTable(tableName, dbName, catalogName, classOf[FakeV2Provider].getName, tableSchema,
+ Map.empty[String, String], description)
+ assert(spark.catalog.listTables()
+ .collect()
+ .map(t => Array(t.catalog, t.namespace.mkString("."), t.name).mkString(".")).toSet ==
+ Set("testcat.my_db.my_table2"))
+ }
+
test("list tables with database") {
assert(spark.catalog.listTables("default").collect().isEmpty)
createDatabase("my_db1")
@@ -229,6 +254,33 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(!funcNames2.contains("my_temp_func"))
}
+ test("SPARK-39828: Catalog.listFunctions() should respect currentCatalog") {
+ assert(spark.catalog.currentCatalog() == "spark_catalog")
+ assert(Set("+", "current_database", "window").subsetOf(
+ spark.catalog.listFunctions().collect().map(_.name).toSet))
+ createFunction("my_func")
+ assert(spark.catalog.listFunctions().collect().map(_.name).contains("my_func"))
+
+ sql(s"CREATE NAMESPACE testcat.ns")
+ spark.catalog.setCurrentCatalog("testcat")
+ spark.catalog.setCurrentDatabase("ns")
+
+ val funcCatalog = spark.sessionState.catalogManager.catalog("testcat")
+ .asInstanceOf[InMemoryCatalog]
+ val function: UnboundFunction = new UnboundFunction {
+ override def bind(inputType: StructType): BoundFunction = new ScalarFunction[Int] {
+ override def inputTypes(): Array[DataType] = Array(IntegerType)
+ override def resultType(): DataType = IntegerType
+ override def name(): String = "my_bound_function"
+ }
+ override def description(): String = "my_function"
+ override def name(): String = "my_function"
+ }
+ assert(!spark.catalog.listFunctions().collect().map(_.name).contains("my_func"))
+ funcCatalog.createFunction(Identifier.of(Array("ns"), "my_func"), function)
+ assert(spark.catalog.listFunctions().collect().map(_.name).contains("my_func"))
+ }
+
test("list functions with database") {
assert(Set("+", "current_database", "window").subsetOf(
spark.catalog.listFunctions().collect().map(_.name).toSet))
@@ -283,7 +335,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
testListColumns("tab1", dbName = Some("db1"))
}
- test("SPARK-39615: three layer namespace compatibility - listColumns") {
+ test("SPARK-39615: qualified name with catalog - listColumns") {
val answers = Map(
"col1" -> ("int", true, false, true),
"col2" -> ("string", true, false, false),
@@ -637,7 +689,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(errMsg.contains("my_temp_table is a temp view. 'recoverPartitions()' expects a table"))
}
- test("three layer namespace compatibility - create managed table") {
+ test("qualified name with catalog - create managed table") {
val catalogName = "testcat"
val dbName = "my_db"
val tableName = "my_table"
@@ -656,7 +708,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(table.properties().get("comment").equals(description))
}
- test("three layer namespace compatibility - create external table") {
+ test("qualified name with catalog - create external table") {
withTempDir { dir =>
val catalogName = "testcat"
val dbName = "my_db"
@@ -680,7 +732,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
}
}
- test("three layer namespace compatibility - list tables") {
+ test("qualified name with catalog - list tables") {
withTempDir { dir =>
val catalogName = "testcat"
val dbName = "my_db"
@@ -729,7 +781,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
Set("my_table1", "my_table2", "my_temp_table"))
}
- test("three layer namespace compatibility - get table") {
+ test("qualified name with catalog - get table") {
val catalogName = "testcat"
val dbName = "default"
val tableName = "my_table"
@@ -757,7 +809,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(t2.catalog == CatalogManager.SESSION_CATALOG_NAME)
}
- test("three layer namespace compatibility - table exists") {
+ test("qualified name with catalog - table exists") {
val catalogName = "testcat"
val dbName = "my_db"
val tableName = "my_table"
@@ -781,7 +833,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString(".")))
}
- test("three layer namespace compatibility - database exists") {
+ test("qualified name with catalog - database exists") {
val catalogName = "testcat"
val dbName = "my_db"
assert(!spark.catalog.databaseExists(Array(catalogName, dbName).mkString(".")))
@@ -793,7 +845,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(!spark.catalog.databaseExists(Array(catalogName2, dbName).mkString(".")))
}
- test("SPARK-39506: three layer namespace compatibility - cache table, isCached and" +
+ test("SPARK-39506: qualified name with catalog - cache table, isCached and" +
"uncacheTable") {
val tableSchema = new StructType().add("i", "int")
createTable("my_table", "my_db", "testcat", classOf[FakeV2Provider].getName,
@@ -840,7 +892,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
}
}
- test("three layer namespace compatibility - get database") {
+ test("qualified name with catalogy - get database") {
val catalogsAndDatabases =
Seq(("testcat", "somedb"), ("testcat", "ns.somedb"), ("spark_catalog", "somedb"))
catalogsAndDatabases.foreach { case (catalog, dbName) =>
@@ -863,7 +915,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
intercept[AnalysisException](spark.catalog.getDatabase("randomcat.db10"))
}
- test("three layer namespace compatibility - get database, same in hive and testcat") {
+ test("qualified name with catalog - get database, same in hive and testcat") {
// create 'testdb' in hive and testcat
val dbName = "testdb"
sql(s"CREATE NAMESPACE spark_catalog.$dbName COMMENT 'hive database'")
@@ -883,7 +935,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(spark.catalog.getDatabase(qualified).name === db)
}
- test("three layer namespace compatibility - set current database") {
+ test("qualified name with catalog - set current database") {
spark.catalog.setCurrentCatalog("testcat")
// namespace with the same name as catalog
sql("CREATE NAMESPACE testcat.testcat.my_db")
@@ -912,8 +964,7 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
assert(e3.contains("unknown_db"))
}
- test("SPARK-39579: Three layer namespace compatibility - " +
- "listFunctions, getFunction, functionExists") {
+ test("SPARK-39579: qualified name with catalog - listFunctions, getFunction, functionExists") {
createDatabase("my_db1")
createFunction("my_func1", Some("my_db1"))
@@ -931,8 +982,8 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf
val func1b = spark.catalog.getFunction("spark_catalog.my_db1.my_func1")
assert(func1a.name === func1b.name && func1a.namespace === func1b.namespace &&
func1a.className === func1b.className && func1a.isTemporary === func1b.isTemporary)
- assert(func1a.catalog === null && func1b.catalog === "spark_catalog")
- assert(func1a.description === null && func1b.description === "N/A.")
+ assert(func1a.catalog === "spark_catalog" && func1b.catalog === "spark_catalog")
+ assert(func1a.description === "N/A." && func1b.description === "N/A.")
val function: UnboundFunction = new UnboundFunction {
override def bind(inputType: StructType): BoundFunction = new ScalarFunction[Int] {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org