You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/11 05:17:29 UTC
[spark] branch master updated: [SPARK-40020][SQL] Centralize the code of qualifying identifiers in SessionCatalog
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new abaec2804fb [SPARK-40020][SQL] Centralize the code of qualifying identifiers in SessionCatalog
abaec2804fb is described below
commit abaec2804fb48d2d378f7c3e99733af97283fa22
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Aug 11 13:16:48 2022 +0800
[SPARK-40020][SQL] Centralize the code of qualifying identifiers in SessionCatalog
### What changes were proposed in this pull request?
This PR is a refactor of `SessionCatalog`. It centralizes the code of qualifying identifiers in one place: `SessionCatalog.qualifyIdentifier`. Then we can call `qualifyIdentifier` in places (mostly in `SessionCatalog`) where we get the current database and format names manually before.
Attaching the v1 catalog name is also kind of qualifying identifiers, and is included in `SessionCatalog.qualifyIdentifier` as well in this PR. This simplifies the code of https://github.com/apache/spark/pull/37021 : now we only need to attach the v1 catalog name in one place, instead of spreading out many places.
This also improves the v1 command resolution: now `ResolveSessionCatalog` uses qualified identifiers when falling back to v1 command, while before we only attach the v1 catalog name. This avoids potential issues that the v1 command execution may qualify the identifiers differently (e.g. current database changes).
### Why are the changes needed?
code cleanup
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
Closes #37415 from cloud-fan/follow.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 4 +-
.../sql/catalyst/catalog/SessionCatalog.scala | 424 +++++++++++----------
.../apache/spark/sql/catalyst/identifiers.scala | 27 +-
.../sql/connector/catalog/CatalogV2Implicits.scala | 7 +-
.../sql/connector/catalog/LookupCatalog.scala | 5 +-
.../catalyst/catalog/ExternalCatalogSuite.scala | 13 +-
.../sql/catalyst/catalog/SessionCatalogSuite.scala | 39 +-
.../sql/connector/catalog/LookupCatalogSuite.scala | 4 +-
.../org/apache/spark/sql/DataFrameWriter.scala | 22 +-
.../catalyst/analysis/ResolveSessionCatalog.scala | 150 ++++----
.../spark/sql/execution/command/functions.scala | 10 +-
.../spark/sql/execution/command/tables.scala | 3 +-
.../apache/spark/sql/execution/command/views.scala | 20 +-
.../sql-tests/results/show-create-table.sql.out | 12 +-
.../test/resources/sql-tests/results/udaf.sql.out | 2 +-
.../sql-tests/results/udf/udf-udaf.sql.out | 2 +-
.../apache/spark/sql/execution/SQLViewSuite.scala | 2 +-
.../spark/sql/execution/command/DDLSuite.scala | 11 +-
.../execution/command/PlanResolutionSuite.scala | 38 +-
.../execution/command/v1/ShowFunctionsSuite.scala | 6 +
.../spark/sql/hive/client/HiveClientImpl.scala | 5 +-
.../apache/spark/sql/hive/client/HiveShim.scala | 5 +-
.../spark/sql/hive/MetastoreDataSourcesSuite.scala | 15 +-
.../spark/sql/hive/execution/HiveDDLSuite.scala | 4 +-
.../execution/command/ShowFunctionsSuite.scala | 6 -
.../spark/sql/sources/HadoopFsRelationTest.scala | 3 +-
26 files changed, 425 insertions(+), 414 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index a6108c2a3d3..aa177bcbcc8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1142,7 +1142,9 @@ class Analyzer(override val catalogManager: CatalogManager)
CatalogV2Util.loadTable(catalog, ident).map {
case v1Table: V1Table if CatalogV2Util.isSessionCatalog(catalog) &&
v1Table.v1Table.tableType == CatalogTableType.VIEW =>
- ResolvedView(ident, isTemp = false)
+ val v1Ident = v1Table.catalogTable.identifier
+ val v2Ident = Identifier.of(v1Ident.database.toArray, v1Ident.identifier)
+ ResolvedView(v2Ident, isTemp = false)
case table =>
ResolvedTable.create(catalog.asTableCatalog, ident, table)
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index a0c98aac6c4..77da74ebc94 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -33,7 +33,6 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.CatalystIdentifier._
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, ExpressionInfo, UpCast}
@@ -131,7 +130,7 @@ class SessionCatalog(
// check whether the temporary view or function exists, then, if not, operate on
// the corresponding item in the current database.
@GuardedBy("this")
- protected var currentDb: String = formatDatabaseName(DEFAULT_DATABASE)
+ protected var currentDb: String = format(DEFAULT_DATABASE)
private val validNameFormat = "([\\w_]+)".r
@@ -149,17 +148,48 @@ class SessionCatalog(
}
/**
- * Format table name, taking into account case sensitivity.
+ * Formats object names, taking into account case sensitivity.
*/
- protected[this] def formatTableName(name: String): String = {
+ protected def format(name: String): String = {
if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
}
/**
- * Format database name, taking into account case sensitivity.
+ * Qualifies the table identifier with the current database if not specified, and normalize all
+ * the names.
*/
- protected[this] def formatDatabaseName(name: String): String = {
- if (conf.caseSensitiveAnalysis) name else name.toLowerCase(Locale.ROOT)
+ def qualifyIdentifier(ident: TableIdentifier): TableIdentifier = {
+ TableIdentifier(
+ table = format(ident.table),
+ database = getDatabase(ident),
+ catalog = getCatalog(ident))
+ }
+
+ /**
+ * Qualifies the function identifier with the current database if not specified, and normalize all
+ * the names.
+ */
+ def qualifyIdentifier(ident: FunctionIdentifier): FunctionIdentifier = {
+ FunctionIdentifier(
+ funcName = format(ident.funcName),
+ database = getDatabase(ident),
+ catalog = getCatalog(ident))
+ }
+
+ private def attachCatalogName(table: CatalogTable): CatalogTable = {
+ table.copy(identifier = table.identifier.copy(catalog = getCatalog(table.identifier)))
+ }
+
+ private def getDatabase(ident: CatalystIdentifier): Option[String] = {
+ Some(format(ident.database.getOrElse(getCurrentDatabase)))
+ }
+
+ private def getCatalog(ident: CatalystIdentifier): Option[String] = {
+ if (conf.getConf(SQLConf.LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME)) {
+ ident.catalog
+ } else {
+ Some(format(ident.catalog.getOrElse(CatalogManager.SESSION_CATALOG_NAME)))
+ }
}
private val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
@@ -195,9 +225,8 @@ class SessionCatalog(
/** This method discards any cached table relation plans for the given table identifier. */
def invalidateCachedTable(name: TableIdentifier): Unit = {
- val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
- val tableName = formatTableName(name.table)
- invalidateCachedTable(QualifiedTableName(dbName, tableName))
+ val qualified = qualifyIdentifier(name)
+ invalidateCachedTable(QualifiedTableName(qualified.database.get, qualified.table))
}
/** This method provides a way to invalidate all the cached plans. */
@@ -223,15 +252,13 @@ class SessionCatalog(
private def requireTableExists(name: TableIdentifier): Unit = {
if (!tableExists(name)) {
- val db = name.database.getOrElse(currentDb)
- throw new NoSuchTableException(db = db, table = name.table)
+ throw new NoSuchTableException(db = name.database.get, table = name.table)
}
}
private def requireTableNotExists(name: TableIdentifier): Unit = {
if (tableExists(name)) {
- val db = name.database.getOrElse(currentDb)
- throw new TableAlreadyExistsException(db = db, table = name.table)
+ throw new TableAlreadyExistsException(db = name.database.get, table = name.table)
}
}
@@ -242,7 +269,7 @@ class SessionCatalog(
// ----------------------------------------------------------------------------
def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {
- val dbName = formatDatabaseName(dbDefinition.name)
+ val dbName = format(dbDefinition.name)
if (dbName == globalTempViewManager.database) {
throw QueryCompilationErrors.cannotCreateDatabaseWithSameNameAsPreservedDatabaseError(
globalTempViewManager.database)
@@ -258,7 +285,7 @@ class SessionCatalog(
}
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
- val dbName = formatDatabaseName(db)
+ val dbName = format(db)
if (dbName == DEFAULT_DATABASE) {
throw QueryCompilationErrors.cannotDropDefaultDatabaseError
}
@@ -274,20 +301,20 @@ class SessionCatalog(
}
def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
- val dbName = formatDatabaseName(dbDefinition.name)
+ val dbName = format(dbDefinition.name)
requireDbExists(dbName)
externalCatalog.alterDatabase(dbDefinition.copy(
name = dbName, locationUri = makeQualifiedDBPath(dbDefinition.locationUri)))
}
def getDatabaseMetadata(db: String): CatalogDatabase = {
- val dbName = formatDatabaseName(db)
+ val dbName = format(db)
requireDbExists(dbName)
externalCatalog.getDatabase(dbName)
}
def databaseExists(db: String): Boolean = {
- val dbName = formatDatabaseName(db)
+ val dbName = format(db)
externalCatalog.databaseExists(dbName)
}
@@ -302,7 +329,7 @@ class SessionCatalog(
def getCurrentDatabase: String = synchronized { currentDb }
def setCurrentDatabase(db: String): Unit = {
- val dbName = formatDatabaseName(db)
+ val dbName = format(db)
if (dbName == globalTempViewManager.database) {
throw QueryCompilationErrors.cannotUsePreservedDatabaseAsCurrentDatabaseError(
globalTempViewManager.database)
@@ -316,7 +343,7 @@ class SessionCatalog(
* by users.
*/
def getDefaultDBPath(db: String): URI = {
- CatalogUtils.stringToURI(formatDatabaseName(db) + ".db")
+ CatalogUtils.stringToURI(format(db) + ".db")
}
// ----------------------------------------------------------------------------
@@ -345,9 +372,9 @@ class SessionCatalog(
throw QueryCompilationErrors.createExternalTableWithoutLocationError
}
- val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(tableDefinition.identifier.table)
- val tableIdentifier = attachSessionCatalog(TableIdentifier(table, Some(db)))
+ val qualifiedIdent = qualifyIdentifier(tableDefinition.identifier)
+ val db = qualifiedIdent.database.get
+ val table = qualifiedIdent.table
validateName(table)
val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
@@ -357,9 +384,9 @@ class SessionCatalog(
makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
- identifier = tableIdentifier)
+ identifier = qualifiedIdent)
} else {
- tableDefinition.copy(identifier = tableIdentifier)
+ tableDefinition.copy(identifier = qualifiedIdent)
}
requireDbExists(db)
@@ -395,7 +422,7 @@ class SessionCatalog(
} else if (new Path(locationUri).isAbsolute) {
makeQualifiedPath(locationUri)
} else {
- val dbName = formatDatabaseName(database)
+ val dbName = format(database)
val dbLocation = makeQualifiedDBPath(getDatabaseMetadata(dbName).locationUri)
new Path(new Path(dbLocation), CatalogUtils.URIToString(locationUri)).toUri
}
@@ -411,11 +438,10 @@ class SessionCatalog(
* this becomes a no-op.
*/
def alterTable(tableDefinition: CatalogTable): Unit = {
- val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(tableDefinition.identifier.table)
- val tableIdentifier = attachSessionCatalog(TableIdentifier(table, Some(db)))
+ val qualifiedIdent = qualifyIdentifier(tableDefinition.identifier)
+ val db = qualifiedIdent.database.get
requireDbExists(db)
- requireTableExists(tableIdentifier)
+ requireTableExists(qualifiedIdent)
val newTableDefinition = if (tableDefinition.storage.locationUri.isDefined
&& !tableDefinition.storage.locationUri.get.isAbsolute) {
// make the location of the table qualified.
@@ -423,9 +449,9 @@ class SessionCatalog(
makeQualifiedTablePath(tableDefinition.storage.locationUri.get, db)
tableDefinition.copy(
storage = tableDefinition.storage.copy(locationUri = Some(qualifiedTableLocation)),
- identifier = tableIdentifier)
+ identifier = qualifiedIdent)
} else {
- tableDefinition.copy(identifier = tableIdentifier)
+ tableDefinition.copy(identifier = qualifiedIdent)
}
externalCatalog.alterTable(newTableDefinition)
@@ -442,11 +468,11 @@ class SessionCatalog(
def alterTableDataSchema(
identifier: TableIdentifier,
newDataSchema: StructType): Unit = {
- val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(identifier.table)
- val tableIdentifier = attachSessionCatalog(TableIdentifier(table, Some(db)))
+ val qualifiedIdent = qualifyIdentifier(identifier)
+ val db = qualifiedIdent.database.get
+ val table = qualifiedIdent.table
requireDbExists(db)
- requireTableExists(tableIdentifier)
+ requireTableExists(qualifiedIdent)
val catalogTable = externalCatalog.getTable(db, table)
val oldDataSchema = catalogTable.dataSchema
@@ -469,14 +495,14 @@ class SessionCatalog(
* identifier.
*/
def alterTableStats(identifier: TableIdentifier, newStats: Option[CatalogStatistics]): Unit = {
- val db = formatDatabaseName(identifier.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(identifier.table)
- val tableIdentifier = attachSessionCatalog(TableIdentifier(table, Some(db)))
+ val qualifiedIdent = qualifyIdentifier(identifier)
+ val db = qualifiedIdent.database.get
+ val table = qualifiedIdent.table
requireDbExists(db)
- requireTableExists(tableIdentifier)
+ requireTableExists(qualifiedIdent)
externalCatalog.alterTableStats(db, table, newStats)
// Invalidate the table relation cache
- refreshTable(identifier)
+ refreshTable(qualifiedIdent)
}
/**
@@ -484,9 +510,8 @@ class SessionCatalog(
* with current database.
*/
def tableExists(name: TableIdentifier): Boolean = {
- val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(name.table)
- externalCatalog.tableExists(db, table)
+ val qualifiedIdent = qualifyIdentifier(name)
+ externalCatalog.tableExists(qualifiedIdent.database.get, qualifiedIdent.table)
}
/**
@@ -509,11 +534,12 @@ class SessionCatalog(
@throws[NoSuchDatabaseException]
@throws[NoSuchTableException]
def getTableRawMetadata(name: TableIdentifier): CatalogTable = {
- val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(name.table)
+ val qualifiedIdent = qualifyIdentifier(name)
+ val db = qualifiedIdent.database.get
+ val table = qualifiedIdent.table
requireDbExists(db)
- requireTableExists(TableIdentifier(table, Some(db)))
- externalCatalog.getTable(db, table)
+ requireTableExists(qualifiedIdent)
+ attachCatalogName(externalCatalog.getTable(db, table))
}
/**
@@ -526,17 +552,17 @@ class SessionCatalog(
@throws[NoSuchDatabaseException]
def getTablesByName(names: Seq[TableIdentifier]): Seq[CatalogTable] = {
if (names.nonEmpty) {
- val dbs = names.map(_.database.getOrElse(getCurrentDatabase))
+ val qualifiedIdents = names.map(qualifyIdentifier)
+ val dbs = qualifiedIdents.map(_.database.get)
+ val tables = qualifiedIdents.map(_.table)
if (dbs.distinct.size != 1) {
- val tables = names.map(name => formatTableName(name.table))
val qualifiedTableNames = dbs.zip(tables).map { case (d, t) => QualifiedTableName(d, t)}
throw QueryCompilationErrors.cannotRetrieveTableOrViewNotInSameDatabaseError(
qualifiedTableNames)
}
- val db = formatDatabaseName(dbs.head)
+ val db = dbs.head
requireDbExists(db)
- val tables = names.map(name => formatTableName(name.table))
- externalCatalog.getTablesByName(db, tables)
+ externalCatalog.getTablesByName(db, tables).map(attachCatalogName)
} else {
Seq.empty
}
@@ -552,10 +578,11 @@ class SessionCatalog(
loadPath: String,
isOverwrite: Boolean,
isSrcLocal: Boolean): Unit = {
- val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(name.table)
+ val qualifiedIdent = qualifyIdentifier(name)
+ val db = qualifiedIdent.database.get
+ val table = qualifiedIdent.table
requireDbExists(db)
- requireTableExists(TableIdentifier(table, Some(db)))
+ requireTableExists(qualifiedIdent)
externalCatalog.loadTable(db, table, loadPath, isOverwrite, isSrcLocal)
}
@@ -571,20 +598,20 @@ class SessionCatalog(
isOverwrite: Boolean,
inheritTableSpecs: Boolean,
isSrcLocal: Boolean): Unit = {
- val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(name.table)
+ val qualifiedIdent = qualifyIdentifier(name)
+ val db = qualifiedIdent.database.get
+ val table = qualifiedIdent.table
requireDbExists(db)
- requireTableExists(TableIdentifier(table, Some(db)))
+ requireTableExists(qualifiedIdent)
requireNonEmptyValueInPartitionSpec(Seq(spec))
externalCatalog.loadPartition(
db, table, loadPath, spec, isOverwrite, inheritTableSpecs, isSrcLocal)
}
def defaultTablePath(tableIdent: TableIdentifier): URI = {
- val dbName = formatDatabaseName(tableIdent.database.getOrElse(getCurrentDatabase))
- val dbLocation = getDatabaseMetadata(dbName).locationUri
-
- new Path(new Path(dbLocation), formatTableName(tableIdent.table)).toUri
+ val qualifiedIdent = qualifyIdentifier(tableIdent)
+ val dbLocation = getDatabaseMetadata(qualifiedIdent.database.get).locationUri
+ new Path(new Path(dbLocation), qualifiedIdent.table).toUri
}
// ----------------------------------------------
@@ -598,11 +625,11 @@ class SessionCatalog(
name: String,
viewDefinition: TemporaryViewRelation,
overrideIfExists: Boolean): Unit = synchronized {
- val table = formatTableName(name)
- if (tempViews.contains(table) && !overrideIfExists) {
+ val normalized = format(name)
+ if (tempViews.contains(normalized) && !overrideIfExists) {
throw new TempTableAlreadyExistsException(name)
}
- tempViews.put(table, viewDefinition)
+ tempViews.put(normalized, viewDefinition)
}
/**
@@ -612,7 +639,7 @@ class SessionCatalog(
name: String,
viewDefinition: TemporaryViewRelation,
overrideIfExists: Boolean): Unit = {
- globalTempViewManager.create(formatTableName(name), viewDefinition, overrideIfExists)
+ globalTempViewManager.create(format(name), viewDefinition, overrideIfExists)
}
/**
@@ -622,7 +649,7 @@ class SessionCatalog(
def alterTempViewDefinition(
name: TableIdentifier,
viewDefinition: TemporaryViewRelation): Boolean = synchronized {
- val viewName = formatTableName(name.table)
+ val viewName = format(name.table)
if (name.database.isEmpty) {
if (tempViews.contains(viewName)) {
createTempView(viewName, viewDefinition, overrideIfExists = true)
@@ -630,7 +657,7 @@ class SessionCatalog(
} else {
false
}
- } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+ } else if (format(name.database.get) == globalTempViewManager.database) {
globalTempViewManager.update(viewName, viewDefinition)
} else {
false
@@ -641,7 +668,7 @@ class SessionCatalog(
* Return a local temporary view exactly as it was stored.
*/
def getRawTempView(name: String): Option[TemporaryViewRelation] = synchronized {
- tempViews.get(formatTableName(name))
+ tempViews.get(format(name))
}
/**
@@ -659,7 +686,7 @@ class SessionCatalog(
* Return a global temporary view exactly as it was stored.
*/
def getRawGlobalTempView(name: String): Option[TemporaryViewRelation] = {
- globalTempViewManager.get(formatTableName(name))
+ globalTempViewManager.get(format(name))
}
/**
@@ -675,7 +702,7 @@ class SessionCatalog(
* Returns true if this view is dropped successfully, false otherwise.
*/
def dropTempView(name: String): Boolean = synchronized {
- tempViews.remove(formatTableName(name)).isDefined
+ tempViews.remove(format(name)).isDefined
}
/**
@@ -684,7 +711,7 @@ class SessionCatalog(
* Returns true if this view is dropped successfully, false otherwise.
*/
def dropGlobalTempView(name: String): Boolean = {
- globalTempViewManager.remove(formatTableName(name))
+ globalTempViewManager.remove(format(name))
}
// -------------------------------------------------------------
@@ -701,10 +728,10 @@ class SessionCatalog(
* current database.
*/
def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable = synchronized {
- val table = formatTableName(name.table)
+ val table = format(name.table)
if (name.database.isEmpty) {
tempViews.get(table).map(_.tableMeta).getOrElse(getTableMetadata(name))
- } else if (formatDatabaseName(name.database.get) == globalTempViewManager.database) {
+ } else if (format(name.database.get) == globalTempViewManager.database) {
globalTempViewManager.get(table).map(_.tableMeta)
.getOrElse(throw new NoSuchTableException(globalTempViewManager.database, table))
} else {
@@ -722,23 +749,23 @@ class SessionCatalog(
* This assumes the database specified in `newName` matches the one in `oldName`.
*/
def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = synchronized {
- val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
- newName.database.map(formatDatabaseName).foreach { newDb =>
+ val qualifiedIdent = qualifyIdentifier(oldName)
+ val db = qualifiedIdent.database.get
+ newName.database.map(format).foreach { newDb =>
if (db != newDb) {
throw QueryCompilationErrors.renameTableSourceAndDestinationMismatchError(db, newDb)
}
}
- val oldTableName = formatTableName(oldName.table)
- val newTableName = formatTableName(newName.table)
+ val oldTableName = qualifiedIdent.table
+ val newTableName = format(newName.table)
if (db == globalTempViewManager.database) {
globalTempViewManager.rename(oldTableName, newTableName)
} else {
requireDbExists(db)
if (oldName.database.isDefined || !tempViews.contains(oldTableName)) {
validateName(newTableName)
- validateNewLocationOfRename(
- TableIdentifier(oldTableName, Some(db)), TableIdentifier(newTableName, Some(db)))
+ validateNewLocationOfRename(qualifiedIdent, qualifyIdentifier(newName))
externalCatalog.renameTable(db, oldTableName, newTableName)
} else {
if (newName.database.isDefined) {
@@ -767,8 +794,9 @@ class SessionCatalog(
name: TableIdentifier,
ignoreIfNotExists: Boolean,
purge: Boolean): Unit = synchronized {
- val db = formatDatabaseName(name.database.getOrElse(currentDb))
- val table = formatTableName(name.table)
+ val qualifiedIdent = qualifyIdentifier(name)
+ val db = qualifiedIdent.database.get
+ val table = qualifiedIdent.table
if (db == globalTempViewManager.database) {
val viewExists = globalTempViewManager.remove(table)
if (!viewExists && !ignoreIfNotExists) {
@@ -779,7 +807,7 @@ class SessionCatalog(
requireDbExists(db)
// When ignoreIfNotExists is false, no exception is issued when the table does not exist.
// Instead, log it as an error message.
- if (tableExists(TableIdentifier(table, Option(db)))) {
+ if (tableExists(qualifiedIdent)) {
externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = purge)
} else if (!ignoreIfNotExists) {
throw new NoSuchTableException(db = db, table = table)
@@ -806,29 +834,28 @@ class SessionCatalog(
*
* @param name The name of the table/view that we look up.
*/
- def lookupRelation(name: TableIdentifier): LogicalPlan = {
- synchronized {
- val db = formatDatabaseName(name.database.getOrElse(currentDb))
- val table = formatTableName(name.table)
- if (db == globalTempViewManager.database) {
- globalTempViewManager.get(table).map { viewDef =>
- SubqueryAlias(table, db, getTempViewPlan(viewDef))
- }.getOrElse(throw new NoSuchTableException(db, table))
- } else if (name.database.isDefined || !tempViews.contains(table)) {
- val metadata = externalCatalog.getTable(db, table)
- getRelation(metadata)
- } else {
- SubqueryAlias(table, getTempViewPlan(tempViews(table)))
- }
+ def lookupRelation(name: TableIdentifier): LogicalPlan = synchronized {
+ val qualifiedIdent = qualifyIdentifier(name)
+ val db = qualifiedIdent.database.get
+ val table = qualifiedIdent.table
+ if (db == globalTempViewManager.database) {
+ globalTempViewManager.get(table).map { viewDef =>
+ SubqueryAlias(table, db, getTempViewPlan(viewDef))
+ }.getOrElse(throw new NoSuchTableException(db, table))
+ } else if (name.database.isDefined || !tempViews.contains(table)) {
+ val metadata = externalCatalog.getTable(db, table)
+ getRelation(metadata)
+ } else {
+ SubqueryAlias(table, getTempViewPlan(tempViews(table)))
}
}
def getRelation(
metadata: CatalogTable,
options: CaseInsensitiveStringMap = CaseInsensitiveStringMap.empty()): LogicalPlan = {
- val name = metadata.identifier
- val db = formatDatabaseName(name.database.getOrElse(currentDb))
- val table = formatTableName(name.table)
+ val qualifiedIdent = qualifyIdentifier(metadata.identifier)
+ val db = qualifiedIdent.database.get
+ val table = qualifiedIdent.table
val multiParts = Seq(CatalogManager.SESSION_CATALOG_NAME, db, table)
if (metadata.tableType == CatalogTableType.VIEW) {
@@ -939,16 +966,16 @@ class SessionCatalog(
}
def lookupTempView(table: String): Option[SubqueryAlias] = {
- val formattedTable = formatTableName(table)
+ val formattedTable = format(table)
getTempView(formattedTable).map { view =>
SubqueryAlias(formattedTable, view)
}
}
def lookupGlobalTempView(db: String, table: String): Option[SubqueryAlias] = {
- val formattedDB = formatDatabaseName(db)
+ val formattedDB = format(db)
if (formattedDB == globalTempViewManager.database) {
- val formattedTable = formatTableName(table)
+ val formattedTable = format(table)
getGlobalTempView(formattedTable).map { view =>
SubqueryAlias(formattedTable, formattedDB, view)
}
@@ -1036,7 +1063,7 @@ class SessionCatalog(
db: String,
pattern: String,
includeLocalTempViews: Boolean): Seq[TableIdentifier] = {
- val dbName = formatDatabaseName(db)
+ val dbName = format(db)
val dbTables = if (dbName == globalTempViewManager.database) {
globalTempViewManager.listViewNames(pattern).map { name =>
TableIdentifier(name, Some(globalTempViewManager.database))
@@ -1059,7 +1086,7 @@ class SessionCatalog(
* List all matching views in the specified database, including local temporary views.
*/
def listViews(db: String, pattern: String): Seq[TableIdentifier] = {
- val dbName = formatDatabaseName(db)
+ val dbName = format(db)
val dbViews = if (dbName == globalTempViewManager.database) {
globalTempViewManager.listViewNames(pattern).map { name =>
TableIdentifier(name, Some(globalTempViewManager.database))
@@ -1109,9 +1136,8 @@ class SessionCatalog(
*/
def refreshTable(name: TableIdentifier): Unit = synchronized {
lookupTempView(name).map(_.refresh).getOrElse {
- val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
- val tableName = formatTableName(name.table)
- val qualifiedTableName = QualifiedTableName(dbName, tableName)
+ val qualifiedIdent = qualifyIdentifier(name)
+ val qualifiedTableName = QualifiedTableName(qualifiedIdent.database.get, qualifiedIdent.table)
tableRelationCache.invalidate(qualifiedTableName)
}
}
@@ -1144,14 +1170,14 @@ class SessionCatalog(
tableName: TableIdentifier,
parts: Seq[CatalogTablePartition],
ignoreIfExists: Boolean): Unit = {
- val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(tableName.table)
+ val qualifiedIdent = qualifyIdentifier(tableName)
+ val db = qualifiedIdent.database.get
requireDbExists(db)
- requireTableExists(TableIdentifier(table, Option(db)))
- requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+ requireTableExists(qualifiedIdent)
+ requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(qualifiedIdent))
requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
externalCatalog.createPartitions(
- db, table, partitionWithQualifiedPath(tableName, parts), ignoreIfExists)
+ db, qualifiedIdent.table, partitionWithQualifiedPath(qualifiedIdent, parts), ignoreIfExists)
}
/**
@@ -1164,13 +1190,14 @@ class SessionCatalog(
ignoreIfNotExists: Boolean,
purge: Boolean,
retainData: Boolean): Unit = {
- val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(tableName.table)
+ val qualifiedIdent = qualifyIdentifier(tableName)
+ val db = qualifiedIdent.database.get
requireDbExists(db)
- requireTableExists(TableIdentifier(table, Option(db)))
- requirePartialMatchedPartitionSpec(specs, getTableMetadata(tableName))
+ requireTableExists(qualifiedIdent)
+ requirePartialMatchedPartitionSpec(specs, getTableMetadata(qualifiedIdent))
requireNonEmptyValueInPartitionSpec(specs)
- externalCatalog.dropPartitions(db, table, specs, ignoreIfNotExists, purge, retainData)
+ externalCatalog.dropPartitions(
+ db, qualifiedIdent.table, specs, ignoreIfNotExists, purge, retainData)
}
/**
@@ -1183,16 +1210,16 @@ class SessionCatalog(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
newSpecs: Seq[TablePartitionSpec]): Unit = {
- val tableMetadata = getTableMetadata(tableName)
- val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(tableName.table)
+ val qualifiedIdent = qualifyIdentifier(tableName)
+ val db = qualifiedIdent.database.get
+ val tableMetadata = getTableMetadata(qualifiedIdent)
requireDbExists(db)
- requireTableExists(TableIdentifier(table, Option(db)))
+ requireTableExists(qualifiedIdent)
requireExactMatchedPartitionSpec(specs, tableMetadata)
requireExactMatchedPartitionSpec(newSpecs, tableMetadata)
requireNonEmptyValueInPartitionSpec(specs)
requireNonEmptyValueInPartitionSpec(newSpecs)
- externalCatalog.renamePartitions(db, table, specs, newSpecs)
+ externalCatalog.renamePartitions(db, qualifiedIdent.table, specs, newSpecs)
}
/**
@@ -1205,13 +1232,14 @@ class SessionCatalog(
* this becomes a no-op.
*/
def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = {
- val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(tableName.table)
+ val qualifiedIdent = qualifyIdentifier(tableName)
+ val db = qualifiedIdent.database.get
requireDbExists(db)
- requireTableExists(TableIdentifier(table, Option(db)))
- requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(tableName))
+ requireTableExists(qualifiedIdent)
+ requireExactMatchedPartitionSpec(parts.map(_.spec), getTableMetadata(qualifiedIdent))
requireNonEmptyValueInPartitionSpec(parts.map(_.spec))
- externalCatalog.alterPartitions(db, table, partitionWithQualifiedPath(tableName, parts))
+ externalCatalog.alterPartitions(
+ db, qualifiedIdent.table, partitionWithQualifiedPath(qualifiedIdent, parts))
}
/**
@@ -1219,13 +1247,13 @@ class SessionCatalog(
* If no database is specified, assume the table is in the current database.
*/
def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = {
- val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(tableName.table)
+ val qualifiedIdent = qualifyIdentifier(tableName)
+ val db = qualifiedIdent.database.get
requireDbExists(db)
- requireTableExists(TableIdentifier(table, Option(db)))
- requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+ requireTableExists(qualifiedIdent)
+ requireExactMatchedPartitionSpec(Seq(spec), getTableMetadata(qualifiedIdent))
requireNonEmptyValueInPartitionSpec(Seq(spec))
- externalCatalog.getPartition(db, table, spec)
+ externalCatalog.getPartition(db, qualifiedIdent.table, spec)
}
/**
@@ -1238,15 +1266,15 @@ class SessionCatalog(
def listPartitionNames(
tableName: TableIdentifier,
partialSpec: Option[TablePartitionSpec] = None): Seq[String] = {
- val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(tableName.table)
+ val qualifiedIdent = qualifyIdentifier(tableName)
+ val db = qualifiedIdent.database.get
requireDbExists(db)
- requireTableExists(TableIdentifier(table, Option(db)))
+ requireTableExists(qualifiedIdent)
partialSpec.foreach { spec =>
- requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+ requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(qualifiedIdent))
requireNonEmptyValueInPartitionSpec(Seq(spec))
}
- externalCatalog.listPartitionNames(db, table, partialSpec)
+ externalCatalog.listPartitionNames(db, qualifiedIdent.table, partialSpec)
}
/**
@@ -1259,15 +1287,15 @@ class SessionCatalog(
def listPartitions(
tableName: TableIdentifier,
partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = {
- val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(tableName.table)
+ val qualifiedIdent = qualifyIdentifier(tableName)
+ val db = qualifiedIdent.database.get
requireDbExists(db)
- requireTableExists(TableIdentifier(table, Option(db)))
+ requireTableExists(qualifiedIdent)
partialSpec.foreach { spec =>
- requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(tableName))
+ requirePartialMatchedPartitionSpec(Seq(spec), getTableMetadata(qualifiedIdent))
requireNonEmptyValueInPartitionSpec(Seq(spec))
}
- externalCatalog.listPartitions(db, table, partialSpec)
+ externalCatalog.listPartitions(db, qualifiedIdent.table, partialSpec)
}
/**
@@ -1277,11 +1305,12 @@ class SessionCatalog(
def listPartitionsByFilter(
tableName: TableIdentifier,
predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
- val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
- val table = formatTableName(tableName.table)
+ val qualifiedIdent = qualifyIdentifier(tableName)
+ val db = qualifiedIdent.database.get
requireDbExists(db)
- requireTableExists(TableIdentifier(table, Option(db)))
- externalCatalog.listPartitionsByFilter(db, table, predicates, conf.sessionLocalTimeZone)
+ requireTableExists(qualifiedIdent)
+ externalCatalog.listPartitionsByFilter(
+ db, qualifiedIdent.table, predicates, conf.sessionLocalTimeZone)
}
/**
@@ -1368,15 +1397,14 @@ class SessionCatalog(
* in the specified database.
*/
def createFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = {
- val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase))
+ val qualifiedIdent = qualifyIdentifier(funcDefinition.identifier)
+ val db = qualifiedIdent.database.get
requireDbExists(db)
- val identifier = attachSessionCatalog(
- FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
- val newFuncDefinition = funcDefinition.copy(identifier = identifier)
- if (!functionExists(identifier)) {
+ val newFuncDefinition = funcDefinition.copy(identifier = qualifiedIdent)
+ if (!functionExists(qualifiedIdent)) {
externalCatalog.createFunction(db, newFuncDefinition)
} else if (!ignoreIfExists) {
- throw new FunctionAlreadyExistsException(db = db, func = identifier.toString)
+ throw new FunctionAlreadyExistsException(db = db, func = qualifiedIdent.funcName)
}
}
@@ -1385,20 +1413,21 @@ class SessionCatalog(
* If no database is specified, assume the function is in the current database.
*/
def dropFunction(name: FunctionIdentifier, ignoreIfNotExists: Boolean): Unit = {
- val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
+ val qualifiedIdent = qualifyIdentifier(name)
+ val db = qualifiedIdent.database.get
+ val funcName = qualifiedIdent.funcName
requireDbExists(db)
- val identifier = attachSessionCatalog(name.copy(database = Some(db)))
- if (functionExists(identifier)) {
- if (functionRegistry.functionExists(identifier)) {
+ if (functionExists(qualifiedIdent)) {
+ if (functionRegistry.functionExists(qualifiedIdent)) {
// If we have loaded this function into the FunctionRegistry,
// also drop it from there.
// For a permanent function, because we loaded it to the FunctionRegistry
// when it's first used, we also need to drop it from the FunctionRegistry.
- functionRegistry.dropFunction(identifier)
+ functionRegistry.dropFunction(qualifiedIdent)
}
- externalCatalog.dropFunction(db, name.funcName)
+ externalCatalog.dropFunction(db, funcName)
} else if (!ignoreIfNotExists) {
- throw new NoSuchPermanentFunctionException(db = db, func = identifier.toString)
+ throw new NoSuchPermanentFunctionException(db = db, func = funcName)
}
}
@@ -1407,22 +1436,21 @@ class SessionCatalog(
* If no database is specified, assume the function is in the current database.
*/
def alterFunction(funcDefinition: CatalogFunction): Unit = {
- val db = formatDatabaseName(funcDefinition.identifier.database.getOrElse(getCurrentDatabase))
+ val qualifiedIdent = qualifyIdentifier(funcDefinition.identifier)
+ val db = qualifiedIdent.database.get
requireDbExists(db)
- val identifier = attachSessionCatalog(
- FunctionIdentifier(funcDefinition.identifier.funcName, Some(db)))
- val newFuncDefinition = funcDefinition.copy(identifier = identifier)
- if (functionExists(identifier)) {
- if (functionRegistry.functionExists(identifier)) {
+ val newFuncDefinition = funcDefinition.copy(identifier = qualifiedIdent)
+ if (functionExists(qualifiedIdent)) {
+ if (functionRegistry.functionExists(qualifiedIdent)) {
// If we have loaded this function into the FunctionRegistry,
// also drop it from there.
// For a permanent function, because we loaded it to the FunctionRegistry
// when it's first used, we also need to drop it from the FunctionRegistry.
- functionRegistry.dropFunction(identifier)
+ functionRegistry.dropFunction(qualifiedIdent)
}
externalCatalog.alterFunction(db, newFuncDefinition)
} else {
- throw new NoSuchPermanentFunctionException(db = db, func = identifier.toString)
+ throw new NoSuchPermanentFunctionException(db = db, func = qualifiedIdent.funcName)
}
}
@@ -1433,9 +1461,10 @@ class SessionCatalog(
* If no database is specified, this will return the function in the current database.
*/
def getFunctionMetadata(name: FunctionIdentifier): CatalogFunction = {
- val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
+ val qualifiedIdent = qualifyIdentifier(name)
+ val db = qualifiedIdent.database.get
requireDbExists(db)
- externalCatalog.getFunction(db, name.funcName)
+ externalCatalog.getFunction(db, qualifiedIdent.funcName).copy(identifier = qualifiedIdent)
}
/**
@@ -1443,9 +1472,10 @@ class SessionCatalog(
*/
def functionExists(name: FunctionIdentifier): Boolean = {
functionRegistry.functionExists(name) || tableFunctionRegistry.functionExists(name) || {
- val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
+ val qualifiedIdent = qualifyIdentifier(name)
+ val db = qualifiedIdent.database.get
requireDbExists(db)
- externalCatalog.functionExists(db, name.funcName)
+ externalCatalog.functionExists(db, qualifiedIdent.funcName)
}
}
@@ -1554,8 +1584,10 @@ class SessionCatalog(
* Returns whether it is a persistent function. If not existed, returns false.
*/
def isPersistentFunction(name: FunctionIdentifier): Boolean = {
- val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
- databaseExists(db) && externalCatalog.functionExists(db, name.funcName)
+ val qualifiedIdent = qualifyIdentifier(name)
+ val db = qualifiedIdent.database.get
+ val funcName = qualifiedIdent.funcName
+ databaseExists(db) && externalCatalog.functionExists(db, funcName)
}
/**
@@ -1660,16 +1692,16 @@ class SessionCatalog(
* This supports both scalar and table functions.
*/
def lookupPersistentFunction(name: FunctionIdentifier): ExpressionInfo = {
- val database = name.database.orElse(Some(currentDb)).map(formatDatabaseName)
- val qualifiedName = attachSessionCatalog(name.copy(database = database))
- functionRegistry.lookupFunction(qualifiedName)
- .orElse(tableFunctionRegistry.lookupFunction(qualifiedName))
+ val qualifiedIdent = qualifyIdentifier(name)
+ val db = qualifiedIdent.database.get
+ val funcName = qualifiedIdent.funcName
+ functionRegistry.lookupFunction(qualifiedIdent)
+ .orElse(tableFunctionRegistry.lookupFunction(qualifiedIdent))
.getOrElse {
- val db = qualifiedName.database.get
requireDbExists(db)
- if (externalCatalog.functionExists(db, name.funcName)) {
- val metadata = externalCatalog.getFunction(db, name.funcName)
- makeExprInfoForHiveFunction(metadata.copy(identifier = qualifiedName))
+ if (externalCatalog.functionExists(db, funcName)) {
+ val metadata = externalCatalog.getFunction(db, funcName)
+ makeExprInfoForHiveFunction(metadata.copy(identifier = qualifiedIdent))
} else {
failFunctionLookup(name)
}
@@ -1700,19 +1732,20 @@ class SessionCatalog(
arguments: Seq[Expression],
registry: FunctionRegistryBase[T],
createFunctionBuilder: CatalogFunction => FunctionRegistryBase[T]#FunctionBuilder): T = {
- val database = formatDatabaseName(name.database.getOrElse(currentDb))
- val qualifiedName = name.copy(database = Some(database))
- if (registry.functionExists(qualifiedName)) {
+ val qualifiedIdent = qualifyIdentifier(name)
+ val db = qualifiedIdent.database.get
+ val funcName = qualifiedIdent.funcName
+ if (registry.functionExists(qualifiedIdent)) {
// This function has been already loaded into the function registry.
- registry.lookupFunction(qualifiedName, arguments)
+ registry.lookupFunction(qualifiedIdent, arguments)
} else {
// The function has not been loaded to the function registry, which means
// that the function is a persistent function (if it actually has been registered
// in the metastore). We need to first put the function in the function registry.
val catalogFunction = try {
- externalCatalog.getFunction(database, qualifiedName.funcName)
+ externalCatalog.getFunction(db, funcName)
} catch {
- case _: AnalysisException => failFunctionLookup(qualifiedName)
+ case _: AnalysisException => failFunctionLookup(qualifiedIdent)
}
loadFunctionResources(catalogFunction.resources)
// Please note that qualifiedName is provided by the user. However,
@@ -1720,14 +1753,14 @@ class SessionCatalog(
// catalog. So, it is possible that qualifiedName is not exactly the same as
// catalogFunction.identifier.unquotedString (difference is on case-sensitivity).
// At here, we preserve the input from the user.
- val funcMetadata = catalogFunction.copy(identifier = qualifiedName)
+ val funcMetadata = catalogFunction.copy(identifier = qualifiedIdent)
registerFunction(
funcMetadata,
overrideIfExists = false,
registry = registry,
functionBuilder = createFunctionBuilder(funcMetadata))
// Now, we need to create the Expression.
- registry.lookupFunction(qualifiedName, arguments)
+ registry.lookupFunction(qualifiedIdent, arguments)
}
}
@@ -1794,7 +1827,7 @@ class SessionCatalog(
* defined).
*/
def listFunctions(db: String, pattern: String): Seq[(FunctionIdentifier, String)] = {
- val dbName = formatDatabaseName(db)
+ val dbName = format(db)
requireDbExists(dbName)
val dbFunctions = externalCatalog.listFunctions(dbName, pattern).map { f =>
FunctionIdentifier(f, Some(dbName)) }
@@ -1805,7 +1838,8 @@ class SessionCatalog(
functions.map {
case f if FunctionRegistry.functionSet.contains(f) => (f, "SYSTEM")
case f if TableFunctionRegistry.functionSet.contains(f) => (f, "SYSTEM")
- case f => (attachSessionCatalog(f), "USER")
+ case f if f.database.isDefined => (qualifyIdentifier(f), "USER")
+ case f => (f, "USER")
}.distinct
}
@@ -1890,7 +1924,7 @@ class SessionCatalog(
assert(oldName.database.nonEmpty)
val databaseLocation =
externalCatalog.getDatabase(oldName.database.get).locationUri
- val newTableLocation = new Path(new Path(databaseLocation), formatTableName(newName.table))
+ val newTableLocation = new Path(new Path(databaseLocation), format(newName.table))
val fs = newTableLocation.getFileSystem(hadoopConf)
if (fs.exists(newTableLocation)) {
throw QueryCompilationErrors.cannotOperateManagedTableWithExistingLocationError(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index 2de44d6f349..926514ac62d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -17,9 +17,6 @@
package org.apache.spark.sql.catalyst
-import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
-import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
-
/**
* An identifier that optionally specifies a database.
*
@@ -64,28 +61,6 @@ sealed trait CatalystIdentifier {
override def toString: String = quotedString
}
-object CatalystIdentifier {
- private def sessionCatalogOption(database: Option[String]): Option[String] = {
- if (!SQLConf.get.getConf(SQLConf.LEGACY_NON_IDENTIFIER_OUTPUT_CATALOG_NAME) &&
- database.isDefined &&
- database.get != SQLConf.get.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)) {
- Some(SESSION_CATALOG_NAME)
- } else {
- None
- }
- }
-
- def attachSessionCatalog(identifier: TableIdentifier): TableIdentifier = {
- val catalog = identifier.catalog.orElse(sessionCatalogOption(identifier.database))
- identifier.copy(catalog = catalog)
- }
-
- def attachSessionCatalog(identifier: FunctionIdentifier): FunctionIdentifier = {
- val catalog = identifier.catalog.orElse(sessionCatalogOption(identifier.database))
- identifier.copy(catalog = catalog)
- }
-}
-
/**
* Encapsulates an identifier that is either a alias name or an identifier that has table
* name and a qualifier.
@@ -113,6 +88,7 @@ object AliasIdentifier {
*/
case class TableIdentifier(table: String, database: Option[String], catalog: Option[String])
extends CatalystIdentifier {
+ assert(catalog.isEmpty || database.isDefined)
override val identifier: String = table
@@ -138,6 +114,7 @@ object TableIdentifier {
*/
case class FunctionIdentifier(funcName: String, database: Option[String], catalog: Option[String])
extends CatalystIdentifier {
+ assert(catalog.isEmpty || database.isDefined)
override val identifier: String = funcName
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index a6c2aa8b86a..91809b6176c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.connector.catalog
import scala.collection.mutable
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.CatalystIdentifier._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.quoteIfNeeded
@@ -133,8 +132,7 @@ private[sql] object CatalogV2Implicits {
def asTableIdentifier: TableIdentifier = ident.namespace match {
case ns if ns.isEmpty => TableIdentifier(ident.name)
- case Array(dbName) =>
- attachSessionCatalog(TableIdentifier(ident.name, Some(dbName)))
+ case Array(dbName) => TableIdentifier(ident.name, Some(dbName))
case _ =>
throw QueryCompilationErrors.identifierHavingMoreThanTwoNamePartsError(
quoted, "TableIdentifier")
@@ -142,8 +140,7 @@ private[sql] object CatalogV2Implicits {
def asFunctionIdentifier: FunctionIdentifier = ident.namespace() match {
case ns if ns.isEmpty => FunctionIdentifier(ident.name())
- case Array(dbName) =>
- attachSessionCatalog(FunctionIdentifier(ident.name(), Some(dbName)))
+ case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName))
case _ =>
throw QueryCompilationErrors.identifierHavingMoreThanTwoNamePartsError(
quoted, "FunctionIdentifier")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
index 5af1322f909..21f4258bce6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.connector.catalog
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
/**
@@ -137,8 +137,7 @@ private[sql] trait LookupCatalog extends Logging {
def unapply(parts: Seq[String]): Option[TableIdentifier] = {
def namesToTableIdentifier(names: Seq[String]): Option[TableIdentifier] = names match {
case Seq(name) => Some(TableIdentifier(name))
- case Seq(database, name) =>
- Some(CatalystIdentifier.attachSessionCatalog(TableIdentifier(name, Some(database))))
+ case Seq(database, name) => Some(TableIdentifier(name, Some(database)))
case _ => None
}
parts match {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 1b0a154a3f4..df981ed33e0 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -25,12 +25,11 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
-import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.connector.catalog.SupportsNamespaces.PROP_OWNER
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -281,13 +280,11 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
val catalog = newBasicCatalog()
val tables = catalog.getTablesByName("db2", Seq("tbl1", "tbl2"))
assert(tables.map(_.identifier.table).sorted == Seq("tbl1", "tbl2"))
- assert(tables.forall(_.identifier.catalog.isDefined))
// After renaming a table, the identifier should still be qualified with catalog.
catalog.renameTable("db2", "tbl1", "tblone")
val tables2 = catalog.getTablesByName("db2", Seq("tbl2", "tblone"))
assert(tables2.map(_.identifier.table).sorted == Seq("tbl2", "tblone"))
- assert(tables2.forall(_.identifier.catalog.isDefined))
}
test("get tables by name when some tables do not exists") {
@@ -775,7 +772,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite {
test("get function") {
val catalog = newBasicCatalog()
assert(catalog.getFunction("db2", "func1") ==
- CatalogFunction(FunctionIdentifier("func1", Some("db2"), Some(SESSION_CATALOG_NAME)),
+ CatalogFunction(FunctionIdentifier("func1", Some("db2")),
funcClass, Seq.empty[FunctionResource]))
intercept[NoSuchFunctionException] {
catalog.getFunction("db2", "does_not_exist")
@@ -1037,7 +1034,7 @@ abstract class CatalogTestUtils {
database: Option[String] = None,
defaultColumns: Boolean = false): CatalogTable = {
CatalogTable(
- identifier = CatalystIdentifier.attachSessionCatalog(TableIdentifier(name, database)),
+ identifier = TableIdentifier(name, database),
tableType = CatalogTableType.EXTERNAL,
storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)),
schema = if (defaultColumns) {
@@ -1081,7 +1078,7 @@ abstract class CatalogTestUtils {
name: String,
props: Map[String, String]): CatalogTable = {
CatalogTable(
- identifier = CatalystIdentifier.attachSessionCatalog(TableIdentifier(name, Some(db))),
+ identifier = TableIdentifier(name, Some(db)),
tableType = CatalogTableType.VIEW,
storage = CatalogStorageFormat.empty,
schema = new StructType()
@@ -1095,7 +1092,7 @@ abstract class CatalogTestUtils {
def newFunc(name: String, database: Option[String] = None): CatalogFunction = {
CatalogFunction(
- CatalystIdentifier.attachSessionCatalog(FunctionIdentifier(name, database)),
+ FunctionIdentifier(name, database),
funcClass,
Seq.empty[FunctionResource])
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 9821c6bb2fe..62491e04831 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -501,16 +501,16 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
test("alter table") {
withBasicCatalog { catalog =>
- val tbl1 = catalog.externalCatalog.getTable("db2", "tbl1")
+ val tbl1 = catalog.getTableRawMetadata(TableIdentifier("tbl1", Some("db2")))
catalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem")))
- val newTbl1 = catalog.externalCatalog.getTable("db2", "tbl1")
+ val newTbl1 = catalog.getTableRawMetadata(TableIdentifier("tbl1", Some("db2")))
assert(!tbl1.properties.contains("toh"))
assert(newTbl1.properties.size == tbl1.properties.size + 1)
assert(newTbl1.properties.get("toh") == Some("frem"))
// Alter table without explicitly specifying database
catalog.setCurrentDatabase("db2")
catalog.alterTable(tbl1.copy(identifier = TableIdentifier("tbl1")))
- val newestTbl1 = catalog.externalCatalog.getTable("db2", "tbl1")
+ val newestTbl1 = catalog.getTableRawMetadata(TableIdentifier("tbl1", Some("db2")))
// For hive serde table, hive metastore will set transient_lastDdlTime in table's properties,
// and its value will be modified, here we ignore it when comparing the two tables.
assert(newestTbl1.copy(properties = Map.empty) == tbl1.copy(properties = Map.empty))
@@ -570,12 +570,13 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
test("get table") {
withBasicCatalog { catalog =>
- assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2")))
- == catalog.externalCatalog.getTable("db2", "tbl1"))
+ val raw = catalog.externalCatalog.getTable("db2", "tbl1")
+ val withCatalog = raw.copy(
+ identifier = raw.identifier.copy(catalog = Some(SESSION_CATALOG_NAME)))
+ assert(catalog.getTableMetadata(TableIdentifier("tbl1", Some("db2"))) == withCatalog)
// Get table without explicitly specifying database
catalog.setCurrentDatabase("db2")
- assert(catalog.getTableMetadata(TableIdentifier("tbl1"))
- == catalog.externalCatalog.getTable("db2", "tbl1"))
+ assert(catalog.getTableMetadata(TableIdentifier("tbl1")) == withCatalog)
}
}
@@ -592,12 +593,16 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
test("get tables by name") {
withBasicCatalog { catalog =>
+ val rawTables = catalog.externalCatalog.getTablesByName("db2", Seq("tbl1", "tbl2"))
+ val tablesWithCatalog = rawTables.map { t =>
+ t.copy(identifier = t.identifier.copy(catalog = Some(SESSION_CATALOG_NAME)))
+ }
assert(catalog.getTablesByName(
Seq(
TableIdentifier("tbl1", Some("db2")),
TableIdentifier("tbl2", Some("db2"))
)
- ) == catalog.externalCatalog.getTablesByName("db2", Seq("tbl1", "tbl2")))
+ ) == tablesWithCatalog)
// Get table without explicitly specifying database
catalog.setCurrentDatabase("db2")
assert(catalog.getTablesByName(
@@ -605,18 +610,22 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
TableIdentifier("tbl1"),
TableIdentifier("tbl2")
)
- ) == catalog.externalCatalog.getTablesByName("db2", Seq("tbl1", "tbl2")))
+ ) == tablesWithCatalog)
}
}
test("get tables by name when some tables do not exist") {
withBasicCatalog { catalog =>
+ val rawTables = catalog.externalCatalog.getTablesByName("db2", Seq("tbl1"))
+ val tablesWithCatalog = rawTables.map { t =>
+ t.copy(identifier = t.identifier.copy(catalog = Some(SESSION_CATALOG_NAME)))
+ }
assert(catalog.getTablesByName(
Seq(
TableIdentifier("tbl1", Some("db2")),
TableIdentifier("tblnotexit", Some("db2"))
)
- ) == catalog.externalCatalog.getTablesByName("db2", Seq("tbl1")))
+ ) == tablesWithCatalog)
// Get table without explicitly specifying database
catalog.setCurrentDatabase("db2")
assert(catalog.getTablesByName(
@@ -624,7 +633,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
TableIdentifier("tbl1"),
TableIdentifier("tblnotexit")
)
- ) == catalog.externalCatalog.getTablesByName("db2", Seq("tbl1")))
+ ) == tablesWithCatalog)
}
}
@@ -633,12 +642,16 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
val name = "砖"
// scalastyle:on
withBasicCatalog { catalog =>
+ val rawTables = catalog.externalCatalog.getTablesByName("db2", Seq("tbl1"))
+ val tablesWithCatalog = rawTables.map { t =>
+ t.copy(identifier = t.identifier.copy(catalog = Some(SESSION_CATALOG_NAME)))
+ }
assert(catalog.getTablesByName(
Seq(
TableIdentifier("tbl1", Some("db2")),
TableIdentifier(name, Some("db2"))
)
- ) == catalog.externalCatalog.getTablesByName("db2", Seq("tbl1")))
+ ) == tablesWithCatalog)
// Get table without explicitly specifying database
catalog.setCurrentDatabase("db2")
assert(catalog.getTablesByName(
@@ -646,7 +659,7 @@ abstract class SessionCatalogSuite extends AnalysisTest with Eventually {
TableIdentifier("tbl1"),
TableIdentifier(name)
)
- ) == catalog.externalCatalog.getTablesByName("db2", Seq("tbl1")))
+ ) == tablesWithCatalog)
}
}
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala
index 2376ea026f7..0db758d5147 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/LookupCatalogSuite.scala
@@ -26,7 +26,6 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.FakeV2SessionCatalog
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -90,8 +89,7 @@ class LookupCatalogSuite extends SparkFunSuite with LookupCatalog with Inside {
case (sql, table, db) =>
inside (parseMultipartIdentifier(sql)) {
case AsTableIdentifier(ident) =>
- ident shouldEqual TableIdentifier(
- table, db, if (db.isDefined) Some(SESSION_CATALOG_NAME) else None)
+ ident shouldEqual TableIdentifier(table, db)
}
}
Seq(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 7e11b8b6d36..a3050b18251 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -22,7 +22,7 @@ import java.util.{Locale, Properties}
import scala.collection.JavaConverters._
import org.apache.spark.annotation.Stable
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, NoSuchTableException, UnresolvedIdentifier, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Literal
@@ -636,18 +636,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
private def saveAsTable(tableIdent: TableIdentifier): Unit = {
val catalog = df.sparkSession.sessionState.catalog
- val tableExists = catalog.tableExists(tableIdent)
- val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
- val tableIdentWithDB = CatalystIdentifier.attachSessionCatalog(
- tableIdent.copy(database = Some(db)))
- val tableName = tableIdentWithDB.unquotedString
+ val qualifiedIdent = catalog.qualifyIdentifier(tableIdent)
+ val tableExists = catalog.tableExists(qualifiedIdent)
+ val tableName = qualifiedIdent.unquotedString
(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
// Do nothing
case (true, SaveMode.ErrorIfExists) =>
- throw QueryCompilationErrors.tableAlreadyExistsError(tableIdent)
+ throw QueryCompilationErrors.tableAlreadyExistsError(qualifiedIdent)
case (true, SaveMode.Overwrite) =>
// Get all input data source or hive relations of the query.
@@ -656,7 +654,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
case relation: HiveTableRelation => relation.tableMeta.identifier
}
- val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
+ val tableRelation = df.sparkSession.table(qualifiedIdent).queryExecution.analyzed
EliminateSubqueryAliases(tableRelation) match {
// check if the table is a data source table (the relation is a BaseRelation).
case LogicalRelation(dest: BaseRelation, _, _, _) if srcRelations.contains(dest) =>
@@ -669,12 +667,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
// Drop the existing table
- catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
- createTable(tableIdentWithDB)
+ catalog.dropTable(qualifiedIdent, ignoreIfNotExists = true, purge = false)
+ createTable(qualifiedIdent)
// Refresh the cache of the table in the catalog.
- catalog.refreshTable(tableIdentWithDB)
+ catalog.refreshTable(qualifiedIdent)
- case _ => createTable(tableIdentWithDB)
+ case _ => createTable(qualifiedIdent)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index a495e35bf2c..e168987189d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.CatalystIdentifier._
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
import org.apache.spark.sql.catalyst.plans.logical._
@@ -37,10 +36,10 @@ import org.apache.spark.sql.internal.connector.V1Function
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
/**
- * Resolves catalogs from the multi-part identifiers in SQL statements, and convert the statements
- * to the corresponding v1 or v2 commands if the resolved catalog is the session catalog.
- *
- * We can remove this rule once we implement all the catalog functionality in `V2SessionCatalog`.
+ * Converts resolved v2 commands to v1 if the catalog is the session catalog. Since the v2 commands
+ * are resolved, the referred tables/views/functions are resolved as well. This rule uses qualified
+ * identifiers to construct the v1 commands, so that v1 commands do not need to qualify identifiers
+ * again, which may lead to inconsistent behavior if the current database is changed in the middle.
*/
class ResolveSessionCatalog(val catalogManager: CatalogManager)
extends Rule[LogicalPlan] with LookupCatalog {
@@ -56,12 +55,13 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.addColumnWithV1TableCannotSpecifyNotNullError
}
}
- AlterTableAddColumnsCommand(ident.asTableIdentifier, cols.map(convertToStructField))
+ AlterTableAddColumnsCommand(ident, cols.map(convertToStructField))
case ReplaceColumns(ResolvedV1TableIdentifier(_), _) =>
throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("REPLACE COLUMNS")
- case a @ AlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, _, _, _) =>
+ case a @ AlterColumn(ResolvedTable(catalog, _, table: V1Table, _), _, _, _, _, _, _)
+ if isSessionCatalog(catalog) =>
if (a.column.name.length > 1) {
throw QueryCompilationErrors
.operationOnlySupportedWithV2TableError("ALTER COLUMN with qualified column")
@@ -92,7 +92,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
dataType,
nullable = true,
builder.build())
- AlterTableChangeColumnCommand(ident.asTableIdentifier, colName, newColumn)
+ AlterTableChangeColumnCommand(table.catalogTable.identifier, colName, newColumn)
case RenameColumn(ResolvedV1TableIdentifier(_), _, _) =>
throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("RENAME COLUMN")
@@ -101,16 +101,16 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
throw QueryCompilationErrors.operationOnlySupportedWithV2TableError("DROP COLUMN")
case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
- AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = false)
+ AlterTableSetPropertiesCommand(ident, props, isView = false)
case UnsetTableProperties(ResolvedV1TableIdentifier(ident), keys, ifExists) =>
- AlterTableUnsetPropertiesCommand(ident.asTableIdentifier, keys, ifExists, isView = false)
+ AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = false)
- case SetViewProperties(ResolvedView(ident, _), props) =>
- AlterTableSetPropertiesCommand(ident.asTableIdentifier, props, isView = true)
+ case SetViewProperties(ResolvedViewIdentifier(ident), props) =>
+ AlterTableSetPropertiesCommand(ident, props, isView = true)
- case UnsetViewProperties(ResolvedView(ident, _), keys, ifExists) =>
- AlterTableUnsetPropertiesCommand(ident.asTableIdentifier, keys, ifExists, isView = true)
+ case UnsetViewProperties(ResolvedViewIdentifier(ident), keys, ifExists) =>
+ AlterTableUnsetPropertiesCommand(ident, keys, ifExists, isView = true)
case DescribeNamespace(DatabaseInSessionCatalog(db), extended, output) if conf.useV1Command =>
DescribeDatabaseCommand(db, extended, output)
@@ -121,27 +121,26 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case SetNamespaceLocation(DatabaseInSessionCatalog(db), location) if conf.useV1Command =>
AlterDatabaseSetLocationCommand(db, location)
- case RenameTable(ResolvedV1TableOrViewIdentifier(oldName), newName, isView) =>
- AlterTableRenameCommand(
- oldName.asTableIdentifier, attachSessionCatalog(newName.asTableIdentifier), isView)
+ case RenameTable(ResolvedV1TableOrViewIdentifier(oldIdent), newName, isView) =>
+ AlterTableRenameCommand(oldIdent, newName.asTableIdentifier, isView)
// Use v1 command to describe (temp) view, as v2 catalog doesn't support view yet.
case DescribeRelation(
ResolvedV1TableOrViewIdentifier(ident), partitionSpec, isExtended, output) =>
- DescribeTableCommand(ident.asTableIdentifier, partitionSpec, isExtended, output)
+ DescribeTableCommand(ident, partitionSpec, isExtended, output)
case DescribeColumn(
ResolvedViewIdentifier(ident), column: UnresolvedAttribute, isExtended, output) =>
// For views, the column will not be resolved by `ResolveReferences` because
// `ResolvedView` stores only the identifier.
- DescribeColumnCommand(ident.asTableIdentifier, column.nameParts, isExtended, output)
+ DescribeColumnCommand(ident, column.nameParts, isExtended, output)
case DescribeColumn(ResolvedV1TableIdentifier(ident), column, isExtended, output) =>
column match {
case u: UnresolvedAttribute =>
throw QueryCompilationErrors.columnDoesNotExistError(u.name)
case a: Attribute =>
- DescribeColumnCommand(ident.asTableIdentifier, a.qualifier :+ a.name, isExtended, output)
+ DescribeColumnCommand(ident, a.qualifier :+ a.name, isExtended, output)
case Alias(child, _) =>
throw QueryCompilationErrors.commandNotSupportNestedColumnError(
"DESC TABLE COLUMN", toPrettySQL(child))
@@ -180,10 +179,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
case RefreshTable(ResolvedV1TableIdentifier(ident)) =>
- RefreshTableCommand(ident.asTableIdentifier)
+ RefreshTableCommand(ident)
- case RefreshTable(r: ResolvedView) =>
- RefreshTableCommand(r.identifier.asTableIdentifier)
+ case RefreshTable(ResolvedViewIdentifier(ident)) =>
+ RefreshTableCommand(ident)
// For REPLACE TABLE [AS SELECT], we should fail if the catalog is resolved to the
// session catalog and the table provider is not v2.
@@ -207,17 +206,18 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
case DropTable(ResolvedV1TableIdentifier(ident), ifExists, purge) =>
- DropTableCommand(ident.asTableIdentifier, ifExists, isView = false, purge = purge)
+ DropTableCommand(ident, ifExists, isView = false, purge = purge)
// v1 DROP TABLE supports temp view.
case DropTable(r: ResolvedView, ifExists, purge) =>
if (!r.isTemp) {
throw QueryCompilationErrors.cannotDropViewWithDropTableError
}
- DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = false, purge = purge)
+ val ResolvedViewIdentifier(ident) = r
+ DropTableCommand(ident, ifExists, isView = false, purge = purge)
- case DropView(r: ResolvedView, ifExists) =>
- DropTableCommand(r.identifier.asTableIdentifier, ifExists, isView = true, purge = false)
+ case DropView(ResolvedViewIdentifier(ident), ifExists) =>
+ DropTableCommand(ident, ifExists, isView = true, purge = false)
case c @ CreateNamespace(DatabaseNameInSessionCatalog(name), _, _) if conf.useV1Command =>
val comment = c.properties.get(SupportsNamespaces.PROP_COMMENT)
@@ -248,60 +248,60 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// ANALYZE TABLE works on permanent views if the views are cached.
case AnalyzeTable(ResolvedV1TableOrViewIdentifier(ident), partitionSpec, noScan) =>
if (partitionSpec.isEmpty) {
- AnalyzeTableCommand(ident.asTableIdentifier, noScan)
+ AnalyzeTableCommand(ident, noScan)
} else {
- AnalyzePartitionCommand(ident.asTableIdentifier, partitionSpec, noScan)
+ AnalyzePartitionCommand(ident, partitionSpec, noScan)
}
case AnalyzeTables(DatabaseInSessionCatalog(db), noScan) =>
AnalyzeTablesCommand(Some(db), noScan)
case AnalyzeColumn(ResolvedV1TableOrViewIdentifier(ident), columnNames, allColumns) =>
- AnalyzeColumnCommand(ident.asTableIdentifier, columnNames, allColumns)
+ AnalyzeColumnCommand(ident, columnNames, allColumns)
case RepairTable(ResolvedV1TableIdentifier(ident), addPartitions, dropPartitions) =>
- RepairTableCommand(ident.asTableIdentifier, addPartitions, dropPartitions)
+ RepairTableCommand(ident, addPartitions, dropPartitions)
case LoadData(ResolvedV1TableIdentifier(ident), path, isLocal, isOverwrite, partition) =>
LoadDataCommand(
- ident.asTableIdentifier,
+ ident,
path,
isLocal,
isOverwrite,
partition)
case ShowCreateTable(ResolvedV1TableOrViewIdentifier(ident), asSerde, output) if asSerde =>
- ShowCreateTableAsSerdeCommand(ident.asTableIdentifier, output)
+ ShowCreateTableAsSerdeCommand(ident, output)
// If target is view, force use v1 command
case ShowCreateTable(ResolvedViewIdentifier(ident), _, output) =>
- ShowCreateTableCommand(ident.asTableIdentifier, output)
+ ShowCreateTableCommand(ident, output)
case ShowCreateTable(ResolvedV1TableIdentifier(ident), _, output)
- if conf.useV1Command => ShowCreateTableCommand(ident.asTableIdentifier, output)
+ if conf.useV1Command => ShowCreateTableCommand(ident, output)
- case ShowCreateTable(ResolvedTable(catalog, ident, table: V1Table, _), _, output)
- if isSessionCatalog(catalog) && DDLUtils.isHiveTable(table.catalogTable) =>
- ShowCreateTableCommand(ident.asTableIdentifier, output)
+ case ShowCreateTable(ResolvedTable(catalog, _, table: V1Table, _), _, output)
+ if isSessionCatalog(catalog) && DDLUtils.isHiveTable(table.catalogTable) =>
+ ShowCreateTableCommand(table.catalogTable.identifier, output)
case TruncateTable(ResolvedV1TableIdentifier(ident)) =>
- TruncateTableCommand(ident.asTableIdentifier, None)
+ TruncateTableCommand(ident, None)
case TruncatePartition(ResolvedV1TableIdentifier(ident), partitionSpec) =>
TruncateTableCommand(
- ident.asTableIdentifier,
+ ident,
Seq(partitionSpec).asUnresolvedPartitionSpecs.map(_.spec).headOption)
case ShowPartitions(
ResolvedV1TableOrViewIdentifier(ident),
pattern @ (None | Some(UnresolvedPartitionSpec(_, _))), output) =>
ShowPartitionsCommand(
- ident.asTableIdentifier,
+ ident,
output,
pattern.map(_.asInstanceOf[UnresolvedPartitionSpec].spec))
case ShowColumns(ResolvedV1TableOrViewIdentifier(ident), ns, output) =>
- val v1TableName = ident.asTableIdentifier
+ val v1TableName = ident
val resolver = conf.resolver
val db = ns match {
case Some(db) if v1TableName.database.exists(!resolver(_, db.head)) =>
@@ -312,14 +312,14 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case RecoverPartitions(ResolvedV1TableIdentifier(ident)) =>
RepairTableCommand(
- ident.asTableIdentifier,
+ ident,
enableAddPartitions = true,
enableDropPartitions = false,
"ALTER TABLE RECOVER PARTITIONS")
case AddPartitions(ResolvedV1TableIdentifier(ident), partSpecsAndLocs, ifNotExists) =>
AlterTableAddPartitionCommand(
- ident.asTableIdentifier,
+ ident,
partSpecsAndLocs.asUnresolvedPartitionSpecs.map(spec => (spec.spec, spec.location)),
ifNotExists)
@@ -327,12 +327,12 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
ResolvedV1TableIdentifier(ident),
UnresolvedPartitionSpec(from, _),
UnresolvedPartitionSpec(to, _)) =>
- AlterTableRenamePartitionCommand(ident.asTableIdentifier, from, to)
+ AlterTableRenamePartitionCommand(ident, from, to)
case DropPartitions(
ResolvedV1TableIdentifier(ident), specs, ifExists, purge) =>
AlterTableDropPartitionCommand(
- ident.asTableIdentifier,
+ ident,
specs.asUnresolvedPartitionSpecs.map(_.spec),
ifExists,
purge,
@@ -344,25 +344,22 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
serdeProperties,
partitionSpec) =>
AlterTableSerDePropertiesCommand(
- ident.asTableIdentifier,
+ ident,
serdeClassName,
serdeProperties,
partitionSpec)
case SetTableLocation(ResolvedV1TableIdentifier(ident), partitionSpec, location) =>
- AlterTableSetLocationCommand(ident.asTableIdentifier, partitionSpec, location)
+ AlterTableSetLocationCommand(ident, partitionSpec, location)
- case AlterViewAs(ResolvedView(ident, _), originalText, query) =>
- AlterViewAsCommand(
- ident.asTableIdentifier,
- originalText,
- query)
+ case AlterViewAs(ResolvedViewIdentifier(ident), originalText, query) =>
+ AlterViewAsCommand(ident, originalText, query)
case CreateView(ResolvedIdentifier(catalog, ident), userSpecifiedColumns, comment,
properties, originalText, child, allowExisting, replace) =>
if (isSessionCatalog(catalog)) {
CreateViewCommand(
- name = attachSessionCatalog(ident.asTableIdentifier),
+ name = catalogManager.v1SessionCatalog.qualifyIdentifier(ident.asTableIdentifier),
userSpecifiedColumns = userSpecifiedColumns,
comment = comment,
properties = properties,
@@ -384,11 +381,11 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
// If target is view, force use v1 command
case ShowTableProperties(ResolvedViewIdentifier(ident), propertyKey, output) =>
- ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output)
+ ShowTablePropertiesCommand(ident, propertyKey, output)
case ShowTableProperties(ResolvedV1TableIdentifier(ident), propertyKey, output)
if conf.useV1Command =>
- ShowTablePropertiesCommand(ident.asTableIdentifier, propertyKey, output)
+ ShowTablePropertiesCommand(ident, propertyKey, output)
case DescribeFunction(ResolvedNonPersistentFunc(_, V1Function(info)), extended) =>
DescribeFunctionCommand(info, extended)
@@ -405,7 +402,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case DropFunction(ResolvedPersistentFunc(catalog, identifier, _), ifExists) =>
if (isSessionCatalog(catalog)) {
- val funcIdentifier = identifier.asFunctionIdentifier
+ val funcIdentifier = catalogManager.v1SessionCatalog.qualifyIdentifier(
+ identifier.asFunctionIdentifier)
DropFunctionCommand(funcIdentifier, ifExists, false)
} else {
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "DROP FUNCTION")
@@ -413,7 +411,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
case RefreshFunction(ResolvedPersistentFunc(catalog, identifier, _)) =>
if (isSessionCatalog(catalog)) {
- val funcIdentifier = identifier.asFunctionIdentifier
+ val funcIdentifier = catalogManager.v1SessionCatalog.qualifyIdentifier(
+ identifier.asFunctionIdentifier)
RefreshFunctionCommand(funcIdentifier.database, funcIdentifier.funcName)
} else {
throw QueryCompilationErrors.missingCatalogAbilityError(catalog, "REFRESH FUNCTION")
@@ -431,7 +430,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
val identifier = FunctionIdentifier(ident.name(), database)
CreateFunctionCommand(
- identifier,
+ catalogManager.v1SessionCatalog.qualifyIdentifier(identifier),
className,
resources,
false,
@@ -451,9 +450,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
ignoreIfExists: Boolean,
storageFormat: CatalogStorageFormat,
provider: String): CreateTableV1 = {
- val tableDesc = buildCatalogTable(attachSessionCatalog(ident.asTableIdentifier), tableSchema,
- partitioning, tableSpec.properties, provider,
- tableSpec.location, tableSpec.comment, storageFormat, tableSpec.external)
+ val tableDesc = buildCatalogTable(
+ catalogManager.v1SessionCatalog.qualifyIdentifier(ident.asTableIdentifier),
+ tableSchema, partitioning, tableSpec.properties, provider,
+ tableSpec.location, tableSpec.comment, storageFormat, tableSpec.external)
val mode = if (ignoreIfExists) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableV1(tableDesc, mode, query)
}
@@ -558,29 +558,29 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
}
object ResolvedViewIdentifier {
- def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match {
- case ResolvedView(ident, _) => Some(ident)
- case _ => None
- }
- }
-
- object ResolvedV1TableAndIdentifier {
- def unapply(resolved: LogicalPlan): Option[(V1Table, Identifier)] = resolved match {
- case ResolvedTable(catalog, ident, table: V1Table, _) if isSessionCatalog(catalog) =>
- Some(table -> ident)
+ def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
+ case ResolvedView(ident, isTemp) =>
+ if (isTemp) {
+ Some(ident.asTableIdentifier)
+ } else {
+ // TODO: we should get the v1 identifier from v1 view directly, similar to `V1Table`. But
+ // there is no general view representation in DS v2 yet.
+ Some(catalogManager.v1SessionCatalog.qualifyIdentifier(ident.asTableIdentifier))
+ }
case _ => None
}
}
object ResolvedV1TableIdentifier {
- def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match {
- case ResolvedTable(catalog, ident, _: V1Table, _) if isSessionCatalog(catalog) => Some(ident)
+ def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
+ case ResolvedTable(catalog, _, t: V1Table, _) if isSessionCatalog(catalog) =>
+ Some(t.catalogTable.identifier)
case _ => None
}
}
object ResolvedV1TableOrViewIdentifier {
- def unapply(resolved: LogicalPlan): Option[Identifier] = resolved match {
+ def unapply(resolved: LogicalPlan): Option[TableIdentifier] = resolved match {
case ResolvedV1TableIdentifier(ident) => Some(ident)
case ResolvedViewIdentifier(ident) => Some(ident)
case _ => None
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
index a66eac63071..c20b89da95f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, FunctionIdentifier}
+import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, FunctionResource}
import org.apache.spark.sql.catalyst.expressions.{Attribute, ExpressionInfo}
@@ -100,8 +100,12 @@ case class DescribeFunctionCommand(
}
override def run(sparkSession: SparkSession): Seq[Row] = {
- val identifier = CatalystIdentifier.attachSessionCatalog(
- FunctionIdentifier(info.getName, Option(info.getDb)))
+ val identifier = if (info.getDb != null) {
+ sparkSession.sessionState.catalog.qualifyIdentifier(
+ FunctionIdentifier(info.getName, Some(info.getDb)))
+ } else {
+ FunctionIdentifier(info.getName)
+ }
val name = identifier.unquotedString
val result = if (info.getClassName != null) {
Row(s"Function: $name") ::
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index a96857ab102..c0b4f8f25e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -207,8 +207,9 @@ case class AlterTableRenameCommand(
// back into the hive metastore cache
catalog.refreshTable(oldName)
catalog.renameTable(oldName, newName)
+ val newQualifiedIdent = catalog.qualifyIdentifier(oldName.copy(table = newName.table))
optStorageLevel.foreach { storageLevel =>
- sparkSession.catalog.cacheTable(newName.unquotedString, storageLevel)
+ sparkSession.catalog.cacheTable(newQualifiedIdent.unquotedString, storageLevel)
}
}
Seq.empty[Row]
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 3a2e32cfdc7..4a6d876ea17 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -107,8 +107,9 @@ case class CreateViewCommand(
// When creating a permanent view, not allowed to reference temporary objects.
// This should be called after `qe.assertAnalyzed()` (i.e., `child` can be resolved)
- verifyTemporaryObjectsNotExists(isTemporary, name, analyzedPlan, referredTempFunctions)
- verifyAutoGeneratedAliasesNotExists(analyzedPlan, isTemporary, name)
+ val qualifiedName = catalog.qualifyIdentifier(name)
+ verifyTemporaryObjectsNotExists(isTemporary, qualifiedName, analyzedPlan, referredTempFunctions)
+ verifyAutoGeneratedAliasesNotExists(analyzedPlan, isTemporary, qualifiedName)
if (viewType == LocalTempView) {
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
@@ -136,16 +137,16 @@ case class CreateViewCommand(
aliasedPlan,
referredTempFunctions)
catalog.createGlobalTempView(name.table, tableDefinition, overrideIfExists = replace)
- } else if (catalog.tableExists(name)) {
- val tableMetadata = catalog.getTableMetadata(name)
+ } else if (catalog.tableExists(qualifiedName)) {
+ val tableMetadata = catalog.getTableMetadata(qualifiedName)
+ val viewIdent = tableMetadata.identifier
if (allowExisting) {
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.
} else if (tableMetadata.tableType != CatalogTableType.VIEW) {
- throw QueryCompilationErrors.tableIsNotViewError(name)
+ throw QueryCompilationErrors.tableIsNotViewError(viewIdent)
} else if (replace) {
// Detect cyclic view reference on CREATE OR REPLACE VIEW.
- val viewIdent = tableMetadata.identifier
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
// uncache the cached data before replacing an exists view
@@ -159,7 +160,7 @@ case class CreateViewCommand(
} else {
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
// exists.
- throw QueryCompilationErrors.viewAlreadyExistsError(name)
+ throw QueryCompilationErrors.viewAlreadyExistsError(viewIdent)
}
} else {
// Create the view if it doesn't exist.
@@ -252,8 +253,9 @@ case class AlterViewAsCommand(
override def run(session: SparkSession): Seq[Row] = {
val isTemporary = session.sessionState.catalog.isTempView(name)
- verifyTemporaryObjectsNotExists(isTemporary, name, query, referredTempFunctions)
- verifyAutoGeneratedAliasesNotExists(query, isTemporary, name)
+ val qualifiedName = session.sessionState.catalog.qualifyIdentifier(name)
+ verifyTemporaryObjectsNotExists(isTemporary, qualifiedName, query, referredTempFunctions)
+ verifyAutoGeneratedAliasesNotExists(query, isTemporary, qualifiedName)
if (isTemporary) {
alterTemporaryView(session, query)
} else {
diff --git a/sql/core/src/test/resources/sql-tests/results/show-create-table.sql.out b/sql/core/src/test/resources/sql-tests/results/show-create-table.sql.out
index 190356aad7f..08c13ccc7e0 100644
--- a/sql/core/src/test/resources/sql-tests/results/show-create-table.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/show-create-table.sql.out
@@ -294,7 +294,7 @@ SHOW CREATE TABLE view_SPARK_30302 AS SERDE
-- !query schema
struct<createtab_stmt:string>
-- !query output
-CREATE VIEW default.view_SPARK_30302 (
+CREATE VIEW default.view_spark_30302 (
aaa,
bbb)
AS SELECT a, b FROM tbl
@@ -305,7 +305,7 @@ SHOW CREATE TABLE view_SPARK_30302
-- !query schema
struct<createtab_stmt:string>
-- !query output
-CREATE VIEW default.view_SPARK_30302 (
+CREATE VIEW default.view_spark_30302 (
aaa,
bbb)
AS SELECT a, b FROM tbl
@@ -334,7 +334,7 @@ SHOW CREATE TABLE view_SPARK_30302 AS SERDE
-- !query schema
struct<createtab_stmt:string>
-- !query output
-CREATE VIEW default.view_SPARK_30302 (
+CREATE VIEW default.view_spark_30302 (
aaa COMMENT 'comment with \'quoted text\' for aaa',
bbb)
COMMENT 'This is a comment with \'quoted text\' for view'
@@ -346,7 +346,7 @@ SHOW CREATE TABLE view_SPARK_30302
-- !query schema
struct<createtab_stmt:string>
-- !query output
-CREATE VIEW default.view_SPARK_30302 (
+CREATE VIEW default.view_spark_30302 (
aaa COMMENT 'comment with \'quoted text\' for aaa',
bbb)
COMMENT 'This is a comment with \'quoted text\' for view'
@@ -376,7 +376,7 @@ SHOW CREATE TABLE view_SPARK_30302 AS SERDE
-- !query schema
struct<createtab_stmt:string>
-- !query output
-CREATE VIEW default.view_SPARK_30302 (
+CREATE VIEW default.view_spark_30302 (
aaa,
bbb)
TBLPROPERTIES (
@@ -390,7 +390,7 @@ SHOW CREATE TABLE view_SPARK_30302
-- !query schema
struct<createtab_stmt:string>
-- !query output
-CREATE VIEW default.view_SPARK_30302 (
+CREATE VIEW default.view_spark_30302 (
aaa,
bbb)
TBLPROPERTIES (
diff --git a/sql/core/src/test/resources/sql-tests/results/udaf.sql.out b/sql/core/src/test/resources/sql-tests/results/udaf.sql.out
index 6f674e1166c..0605af1c808 100644
--- a/sql/core/src/test/resources/sql-tests/results/udaf.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udaf.sql.out
@@ -31,7 +31,7 @@ SELECT default.myDoubleAvg(int_col1, 3) as my_avg from t1
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
-Invalid number of arguments for function spark_catalog.default.myDoubleAvg. Expected: 1; Found: 2; line 1 pos 7
+Invalid number of arguments for function spark_catalog.default.mydoubleavg. Expected: 1; Found: 2; line 1 pos 7
-- !query
diff --git a/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out b/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out
index e7c9cd016c3..80a3d9af942 100644
--- a/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/udf/udf-udaf.sql.out
@@ -31,7 +31,7 @@ SELECT default.myDoubleAvg(udf(int_col1), udf(3)) as my_avg from t1
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
-Invalid number of arguments for function spark_catalog.default.myDoubleAvg. Expected: 1; Found: 2; line 1 pos 7
+Invalid number of arguments for function spark_catalog.default.mydoubleavg. Expected: 1; Found: 2; line 1 pos 7
-- !query
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
index e261d010444..016e24a1c4b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala
@@ -252,7 +252,7 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils {
e = intercept[AnalysisException] {
sql(s"""LOAD DATA LOCAL INPATH "$dataFilePath" INTO TABLE $viewName""")
}.getMessage
- assert(e.contains("default.testView is a view. 'LOAD DATA' expects a table"))
+ assert(e.contains("default.testview is a view. 'LOAD DATA' expects a table"))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 44beb3a1697..c4a787cb891 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.permission.{AclEntry, AclStatus}
import org.apache.spark.{SparkException, SparkFiles}
import org.apache.spark.internal.config
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, FunctionIdentifier, QualifiedTableName, TableIdentifier}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -66,7 +66,7 @@ class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSparkSession {
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
CatalogTable(
- identifier = CatalystIdentifier.attachSessionCatalog(name),
+ identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
schema = schema.copy(
@@ -698,7 +698,8 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
createDatabase(catalog, "dbx")
val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
createTable(catalog, tableIdent1)
- val expectedTable = generateTable(catalog, tableIdent1)
+ val expectedTable = generateTable(
+ catalog, tableIdent1.copy(catalog = Some(SESSION_CATALOG_NAME)))
checkCatalogTables(expectedTable, catalog.getTableMetadata(tableIdent1))
}
@@ -783,7 +784,7 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
sql("ALTER TABLE tab1 RENAME TO default.tab2")
}
assert(e.getMessage.contains(
- s"RENAME TEMPORARY VIEW from '`tab1`' to '`$SESSION_CATALOG_NAME`.`default`.`tab2`': " +
+ s"RENAME TEMPORARY VIEW from '`tab1`' to '`default`.`tab2`': " +
"cannot specify database name 'default' in the destination table"))
val catalog = spark.sessionState.catalog
@@ -808,7 +809,7 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase {
sql("ALTER TABLE view1 RENAME TO default.tab2")
}
assert(e.getMessage.contains(
- s"RENAME TEMPORARY VIEW from '`view1`' to '`$SESSION_CATALOG_NAME`.`default`.`tab2`': " +
+ s"RENAME TEMPORARY VIEW from '`view1`' to '`default`.`tab2`': " +
"cannot specify database name 'default' in the destination table"))
val catalog = spark.sessionState.catalog
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 610fcd54b2a..56362749cf1 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -126,30 +126,20 @@ class PlanResolutionSuite extends AnalysisTest {
t
}
- private val v1Table: V1Table = {
+ private def createV1TableMock(
+ ident: Identifier,
+ provider: String = v1Format,
+ tableType: CatalogTableType = CatalogTableType.MANAGED): V1Table = {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
val t = mock(classOf[CatalogTable])
when(t.schema).thenReturn(new StructType()
.add("i", "int")
.add("s", "string")
.add("point", new StructType().add("x", "int").add("y", "int")))
- when(t.tableType).thenReturn(CatalogTableType.MANAGED)
- when(t.provider).thenReturn(Some(v1Format))
- V1Table(t)
- }
-
- private val v1HiveTable: V1Table = {
- val t = mock(classOf[CatalogTable])
- when(t.schema).thenReturn(new StructType().add("i", "int").add("s", "string"))
- when(t.tableType).thenReturn(CatalogTableType.MANAGED)
- when(t.provider).thenReturn(Some("hive"))
- V1Table(t)
- }
-
- private val view: V1Table = {
- val t = mock(classOf[CatalogTable])
- when(t.schema).thenReturn(new StructType().add("i", "int").add("s", "string"))
- when(t.tableType).thenReturn(CatalogTableType.VIEW)
- when(t.provider).thenReturn(Some(v1Format))
+ when(t.tableType).thenReturn(tableType)
+ when(t.provider).thenReturn(Some(provider))
+ when(t.identifier).thenReturn(
+ ident.asTableIdentifier.copy(catalog = Some(SESSION_CATALOG_NAME)))
V1Table(t)
}
@@ -174,14 +164,14 @@ class PlanResolutionSuite extends AnalysisTest {
private val v2SessionCatalog: TableCatalog = {
val newCatalog = mock(classOf[TableCatalog])
when(newCatalog.loadTable(any())).thenAnswer((invocation: InvocationOnMock) => {
- invocation.getArgument[Identifier](0).name match {
- case "v1Table" => v1Table
- case "v1Table1" => v1Table
- case "v1HiveTable" => v1HiveTable
+ val ident = invocation.getArgument[Identifier](0)
+ ident.name match {
+ case "v1Table" | "v1Table1" => createV1TableMock(ident)
+ case "v1HiveTable" => createV1TableMock(ident, provider = "hive")
case "v2Table" => table
case "v2Table1" => table1
case "v2TableWithAcceptAnySchemaCapability" => tableWithAcceptAnySchemaCapability
- case "view" => view
+ case "view" => createV1TableMock(ident, tableType = CatalogTableType.VIEW)
case name => throw new NoSuchTableException(name)
}
})
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowFunctionsSuite.scala
index e46e2ce0d14..e49d470f322 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowFunctionsSuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.execution.command.v1
+import java.util.Locale
+
import test.org.apache.spark.sql.MyDoubleSum
import org.apache.spark.sql.execution.command
@@ -37,6 +39,10 @@ trait ShowFunctionsSuiteBase extends command.ShowFunctionsSuiteBase
override protected def dropFunction(name: String): Unit = {
sql(s"DROP FUNCTION IF EXISTS $name")
}
+ override protected def qualifiedFunName(ns: String, name: String): String = {
+ // `SessionCatalog` lower-cases function names before creating.
+ super.qualifiedFunName(ns, name).toLowerCase(Locale.ROOT)
+ }
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index deb8a458110..94663e5c2ec 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -49,7 +49,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{DatabaseAlreadyExistsException, NoSuchDatabaseException, NoSuchPartitionException, NoSuchPartitionsException, NoSuchTableException, PartitionAlreadyExistsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -503,8 +503,7 @@ private[hive] class HiveClientImpl(
val comment = properties.get("comment")
CatalogTable(
- identifier = CatalystIdentifier.attachSessionCatalog(
- TableIdentifier(h.getTableName, Option(h.getDbName))),
+ identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL
case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 53feae0d8e8..95e5582cb8c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -41,7 +41,7 @@ import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.spark.internal.Logging
import org.apache.spark.metrics.source.HiveCatalogMetrics
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, FunctionIdentifier, InternalRow}
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow}
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable, CatalogTablePartition, CatalogUtils, ExternalCatalogUtils, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
@@ -809,8 +809,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
}
private def fromHiveFunction(hf: HiveFunction): CatalogFunction = {
- val name = CatalystIdentifier.attachSessionCatalog(
- FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName)))
+ val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName))
val resources = hf.getResourceUris.asScala.map { uri =>
val resourceType = uri.getResourceType() match {
case ResourceType.ARCHIVE => "archive"
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index f6165e0f9d6..d755dce90e0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -336,7 +336,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}.getMessage
assert(
- message.contains(s"Table $SESSION_CATALOG_NAME.default.ctasJsonTable already exists."),
+ message.contains(s"Table $SESSION_CATALOG_NAME.default.ctasjsontable already exists."),
"We should complain that ctasJsonTable already exists")
// The following statement should be fine if it has IF NOT EXISTS.
@@ -526,7 +526,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
intercept[AnalysisException] {
sparkSession.catalog.createTable("createdJsonTable", jsonFilePath.toString)
}.getMessage.contains(
- s"Table $SESSION_CATALOG_NAME.default.createdJsonTable already exists."))
+ s"Table $SESSION_CATALOG_NAME.default.createdjsontable already exists."))
}
// Data should not be deleted.
@@ -909,7 +909,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
createDF(10, 19).write.mode(SaveMode.Append).format("orc").saveAsTable("appendOrcToParquet")
}
assert(e.getMessage.contains("The format of the existing table " +
- s"$SESSION_CATALOG_NAME.default.appendOrcToParquet is `Parquet"))
+ s"$SESSION_CATALOG_NAME.default.appendorctoparquet is `Parquet"))
}
withTable("appendParquetToJson") {
@@ -920,7 +920,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}.getMessage
assert(msg.contains("The format of the existing table " +
- s"$SESSION_CATALOG_NAME.default.appendParquetToJson is `Json"))
+ s"$SESSION_CATALOG_NAME.default.appendparquettojson is `Json"))
}
withTable("appendTextToJson") {
@@ -931,7 +931,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}.getMessage
// The format of the existing table can be JsonDataSourceV2 or JsonFileFormat.
assert(msg.contains("The format of the existing table " +
- s"$SESSION_CATALOG_NAME.default.appendTextToJson is `Json"))
+ s"$SESSION_CATALOG_NAME.default.appendtexttojson is `Json"))
}
}
@@ -1251,7 +1251,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
e = intercept[AnalysisException] {
table(tableName).write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName)
}.getMessage
- assert(e.contains(s"Table `$tableName` already exists"))
+ assert(e.contains(s"Table `$SESSION_CATALOG_NAME`.`default`.`$tableName` already exists"))
}
}
@@ -1346,8 +1346,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
withDebugMode {
val tableMeta = sharedState.externalCatalog.getTable("default", "t")
- assert(tableMeta.identifier ==
- TableIdentifier("t", Some("default"), Some(SESSION_CATALOG_NAME)))
+ assert(tableMeta.identifier == TableIdentifier("t", Some("default")))
assert(tableMeta.properties(DATASOURCE_PROVIDER) == "json")
}
} finally {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 9fd2e6e7f62..87773d502ae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
-import org.apache.spark.sql.catalyst.{CatalystIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.parser.ParseException
@@ -93,7 +93,7 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA
.add("col1", "int", nullable = true, metadata = metadata)
.add("col2", "string")
CatalogTable(
- identifier = CatalystIdentifier.attachSessionCatalog(name),
+ identifier = name,
tableType = CatalogTableType.EXTERNAL,
storage = storage,
schema = schema.copy(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowFunctionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowFunctionsSuite.scala
index 95d7af00de3..84d1c209dd4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowFunctionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowFunctionsSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.hive.execution.command
-import java.util.Locale
-
import org.apache.spark.sql.execution.command.v1
/**
@@ -26,8 +24,4 @@ import org.apache.spark.sql.execution.command.v1
*/
class ShowFunctionsSuite extends v1.ShowFunctionsSuiteBase with CommandSuiteBase {
override def commandVersion: String = super[ShowFunctionsSuiteBase].commandVersion
- override def qualifiedFunName(ns: String, name: String): String = {
- // Hive Metastore lower-cases all identifiers.
- super.qualifiedFunName(ns, name).toLowerCase(Locale.ROOT)
- }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
index 2637a1d18e3..ce800e88218 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/HadoopFsRelationTest.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
+import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
import org.apache.spark.sql.execution.DataSourceScanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.test.TestHiveSingleton
@@ -384,7 +385,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
val msg = intercept[AnalysisException] {
testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t")
}.getMessage
- assert(msg.contains("Table `t` already exists"))
+ assert(msg.contains(s"Table `$SESSION_CATALOG_NAME`.`default`.`t` already exists"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org