You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2024/02/28 05:48:32 UTC
(spark) branch master updated: [SPARK-47191][SQL] Avoid unnecessary relation lookup when uncaching table/view
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6d41866e6314 [SPARK-47191][SQL] Avoid unnecessary relation lookup when uncaching table/view
6d41866e6314 is described below
commit 6d41866e631403d994e8ad256c5323abc3548532
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Feb 28 13:48:18 2024 +0800
[SPARK-47191][SQL] Avoid unnecessary relation lookup when uncaching table/view
### What changes were proposed in this pull request?
Some commands like ALTER TABLE and REPLACE VIEW need to uncache the table/view. Today the implementation simply does `sparkSession.catalog.uncacheTable(name)`, which looks up the table/view. This PR improves it by using the existing `CacheManager.uncacheTableOrView` function, which does not need to look up the table view.
### Why are the changes needed?
small perf improvement (reduce metastore RPC calls)
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #45289 from cloud-fan/view.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/connector/catalog/CatalogV2Implicits.scala | 4 ++++
.../org/apache/spark/sql/execution/CacheManager.scala | 13 +++++--------
.../spark/sql/execution/command/CommandUtils.scala | 19 ++++++++++++-------
.../apache/spark/sql/execution/command/tables.scala | 4 ++--
.../apache/spark/sql/execution/command/views.scala | 6 +++---
.../datasources/v2/DataSourceV2Strategy.scala | 6 ++----
.../sql/hive/execution/InsertIntoHiveTable.scala | 2 +-
7 files changed, 29 insertions(+), 25 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 2b712241633b..bf4cd2eedc83 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -169,6 +169,10 @@ private[sql] object CatalogV2Implicits {
case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName))
case _ => throw QueryCompilationErrors.identifierTooManyNamePartsError(original)
}
+
+ def toQualifiedNameParts(catalog: CatalogPlugin): Seq[String] = {
+ (catalog.name() +: ident.namespace() :+ ident.name()).toImmutableArraySeq
+ }
}
implicit class MultipartIdentifierHelper(parts: Seq[String]) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index db6266fe1756..6c5639ef99d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -36,7 +36,6 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
-import org.apache.spark.util.ArrayImplicits._
/** Holds a cached logical plan and its data */
case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
@@ -170,22 +169,20 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
plan match {
case SubqueryAlias(ident, LogicalRelation(_, _, Some(catalogTable), _)) =>
val v1Ident = catalogTable.identifier
- isSameName(ident.qualifier :+ ident.name) &&
- isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table)
+ isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.nameParts)
case SubqueryAlias(ident, DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _)) =>
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
isSameName(ident.qualifier :+ ident.name) &&
- isSameName((catalog.name() +: v2Ident.namespace() :+ v2Ident.name()).toImmutableArraySeq)
+ isSameName(v2Ident.toQualifiedNameParts(catalog))
case SubqueryAlias(ident, View(catalogTable, _, _)) =>
val v1Ident = catalogTable.identifier
- isSameName(ident.qualifier :+ ident.name) &&
- isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table)
+ isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.nameParts)
case SubqueryAlias(ident, HiveTableRelation(catalogTable, _, _, _, _)) =>
val v1Ident = catalogTable.identifier
- isSameName(ident.qualifier :+ ident.name) &&
- isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table)
+ isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.nameParts)
case _ => false
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 73478272a684..eccf16ecea13 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, CatalogTableType, ExternalCatalogUtils}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
@@ -462,12 +462,17 @@ object CommandUtils extends Logging {
!path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
}
- def uncacheTableOrView(sparkSession: SparkSession, name: String): Unit = {
- try {
- sparkSession.catalog.uncacheTable(name)
- } catch {
- case NonFatal(e) => logWarning(s"Exception when attempting to uncache $name", e)
- }
+ def uncacheTableOrView(sparkSession: SparkSession, ident: ResolvedIdentifier): Unit = {
+ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+ uncacheTableOrView(sparkSession, ident.identifier.toQualifiedNameParts(ident.catalog))
+ }
+
+ def uncacheTableOrView(sparkSession: SparkSession, ident: TableIdentifier): Unit = {
+ uncacheTableOrView(sparkSession, ident.nameParts)
+ }
+
+ private def uncacheTableOrView(sparkSession: SparkSession, name: Seq[String]): Unit = {
+ sparkSession.sharedState.cacheManager.uncacheTableOrView(sparkSession, name, cascade = true)
}
def calculateRowCountsPerPartition(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index fa288fd94ea9..1a97b965da2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -203,7 +203,7 @@ case class AlterTableRenameCommand(
sparkSession.table(oldName.unquotedString))
val optStorageLevel = optCachedData.map(_.cachedRepresentation.cacheBuilder.storageLevel)
if (optStorageLevel.isDefined) {
- CommandUtils.uncacheTableOrView(sparkSession, oldName.unquotedString)
+ CommandUtils.uncacheTableOrView(sparkSession, oldName)
}
// Invalidate the table last, otherwise uncaching the table would load the logical plan
// back into the hive metastore cache
@@ -235,7 +235,7 @@ case class AlterTableAddColumnsCommand(
val colsWithProcessedDefaults =
constantFoldCurrentDefaultsToExistDefaults(sparkSession, catalogTable.provider)
- CommandUtils.uncacheTableOrView(sparkSession, table.quotedString)
+ CommandUtils.uncacheTableOrView(sparkSession, table)
catalog.refreshTable(table)
SchemaUtils.checkColumnNameDuplication(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 3a761541a00e..d71d0d43683c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -155,7 +155,7 @@ case class CreateViewCommand(
// uncache the cached data before replacing an exists view
logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.")
- CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
+ CommandUtils.uncacheTableOrView(sparkSession, viewIdent)
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
// Nothing we need to retain from the old view, so just drop and create a new one
@@ -298,7 +298,7 @@ case class AlterViewAsCommand(
checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.")
- CommandUtils.uncacheTableOrView(session, viewIdent.quotedString)
+ CommandUtils.uncacheTableOrView(session, viewIdent)
val newProperties = generateViewProperties(
viewMeta.properties, session, analyzedPlan, analyzedPlan.schema.fieldNames)
@@ -667,7 +667,7 @@ object ViewHelper extends SQLConfHelper with Logging {
// view is already converted to the underlying tables. So no cyclic views.
checkCyclicViewReference(analyzedPlan, Seq(name), name)
}
- CommandUtils.uncacheTableOrView(session, name.quotedString)
+ CommandUtils.uncacheTableOrView(session, name)
}
if (!storeAnalyzedPlanForView) {
TemporaryViewRelation(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 30d05350aa72..d33ecea6e0db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBat
import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation, PushableColumnAndNestedColumn}
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
@@ -348,10 +349,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
case DropTable(r: ResolvedIdentifier, ifExists, purge) =>
- val invalidateFunc = () => session.sharedState.cacheManager.uncacheTableOrView(
- session,
- (r.catalog.name() +: r.identifier.namespace() :+ r.identifier.name()).toImmutableArraySeq,
- cascade = true)
+ val invalidateFunc = () => CommandUtils.uncacheTableOrView(session, r)
DropTableExec(r.catalog.asTableCatalog, r.identifier, ifExists, purge, invalidateFunc) :: Nil
case _: NoopCommand =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 74d131d6664f..4a92bfd84040 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -108,7 +108,7 @@ case class InsertIntoHiveTable(
}
// un-cache this table.
- CommandUtils.uncacheTableOrView(sparkSession, table.identifier.quotedString)
+ CommandUtils.uncacheTableOrView(sparkSession, table.identifier)
sparkSession.sessionState.catalog.refreshTable(table.identifier)
CommandUtils.updateTableStats(sparkSession, table)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org