You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2024/02/28 05:48:32 UTC

(spark) branch master updated: [SPARK-47191][SQL] Avoid unnecessary relation lookup when uncaching table/view

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d41866e6314 [SPARK-47191][SQL] Avoid unnecessary relation lookup when uncaching table/view
6d41866e6314 is described below

commit 6d41866e631403d994e8ad256c5323abc3548532
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Wed Feb 28 13:48:18 2024 +0800

    [SPARK-47191][SQL] Avoid unnecessary relation lookup when uncaching table/view
    
    ### What changes were proposed in this pull request?
    
    Some commands like ALTER TABLE and REPLACE VIEW need to uncache the table/view. Today the implementation simply does `sparkSession.catalog.uncacheTable(name)`, which looks up the table/view. This PR improves it by using the existing `CacheManager.uncacheTableOrView` function, which does not need to look up the table view.
    
    ### Why are the changes needed?
    
    small perf improvement (reduce metastore RPC calls)
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #45289 from cloud-fan/view.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/connector/catalog/CatalogV2Implicits.scala    |  4 ++++
 .../org/apache/spark/sql/execution/CacheManager.scala | 13 +++++--------
 .../spark/sql/execution/command/CommandUtils.scala    | 19 ++++++++++++-------
 .../apache/spark/sql/execution/command/tables.scala   |  4 ++--
 .../apache/spark/sql/execution/command/views.scala    |  6 +++---
 .../datasources/v2/DataSourceV2Strategy.scala         |  6 ++----
 .../sql/hive/execution/InsertIntoHiveTable.scala      |  2 +-
 7 files changed, 29 insertions(+), 25 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
index 2b712241633b..bf4cd2eedc83 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Implicits.scala
@@ -169,6 +169,10 @@ private[sql] object CatalogV2Implicits {
       case Array(dbName) => FunctionIdentifier(ident.name(), Some(dbName))
       case _ => throw QueryCompilationErrors.identifierTooManyNamePartsError(original)
     }
+
+    def toQualifiedNameParts(catalog: CatalogPlugin): Seq[String] = {
+      (catalog.name() +: ident.namespace() :+ ident.name()).toImmutableArraySeq
+    }
   }
 
   implicit class MultipartIdentifierHelper(parts: Seq[String]) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index db6266fe1756..6c5639ef99d4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -36,7 +36,6 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
-import org.apache.spark.util.ArrayImplicits._
 
 /** Holds a cached logical plan and its data */
 case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
@@ -170,22 +169,20 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
     plan match {
       case SubqueryAlias(ident, LogicalRelation(_, _, Some(catalogTable), _)) =>
         val v1Ident = catalogTable.identifier
-        isSameName(ident.qualifier :+ ident.name) &&
-          isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table)
+        isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.nameParts)
 
       case SubqueryAlias(ident, DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _)) =>
+        import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
         isSameName(ident.qualifier :+ ident.name) &&
-          isSameName((catalog.name() +: v2Ident.namespace() :+ v2Ident.name()).toImmutableArraySeq)
+          isSameName(v2Ident.toQualifiedNameParts(catalog))
 
       case SubqueryAlias(ident, View(catalogTable, _, _)) =>
         val v1Ident = catalogTable.identifier
-        isSameName(ident.qualifier :+ ident.name) &&
-          isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table)
+        isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.nameParts)
 
       case SubqueryAlias(ident, HiveTableRelation(catalogTable, _, _, _, _)) =>
         val v1Ident = catalogTable.identifier
-        isSameName(ident.qualifier :+ ident.name) &&
-          isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ v1Ident.table)
+        isSameName(ident.qualifier :+ ident.name) && isSameName(v1Ident.nameParts)
 
       case _ => false
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 73478272a684..eccf16ecea13 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Column, SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
+import org.apache.spark.sql.catalyst.analysis.{ResolvedIdentifier, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition, CatalogTableType, ExternalCatalogUtils}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions._
@@ -462,12 +462,17 @@ object CommandUtils extends Logging {
     !path.getName.startsWith(stagingDir) && DataSourceUtils.isDataPath(path)
   }
 
-  def uncacheTableOrView(sparkSession: SparkSession, name: String): Unit = {
-    try {
-      sparkSession.catalog.uncacheTable(name)
-    } catch {
-      case NonFatal(e) => logWarning(s"Exception when attempting to uncache $name", e)
-    }
+  def uncacheTableOrView(sparkSession: SparkSession, ident: ResolvedIdentifier): Unit = {
+    import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
+    uncacheTableOrView(sparkSession, ident.identifier.toQualifiedNameParts(ident.catalog))
+  }
+
+  def uncacheTableOrView(sparkSession: SparkSession, ident: TableIdentifier): Unit = {
+    uncacheTableOrView(sparkSession, ident.nameParts)
+  }
+
+  private def uncacheTableOrView(sparkSession: SparkSession, name: Seq[String]): Unit = {
+    sparkSession.sharedState.cacheManager.uncacheTableOrView(sparkSession, name, cascade = true)
   }
 
   def calculateRowCountsPerPartition(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index fa288fd94ea9..1a97b965da2b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -203,7 +203,7 @@ case class AlterTableRenameCommand(
         sparkSession.table(oldName.unquotedString))
       val optStorageLevel = optCachedData.map(_.cachedRepresentation.cacheBuilder.storageLevel)
       if (optStorageLevel.isDefined) {
-        CommandUtils.uncacheTableOrView(sparkSession, oldName.unquotedString)
+        CommandUtils.uncacheTableOrView(sparkSession, oldName)
       }
       // Invalidate the table last, otherwise uncaching the table would load the logical plan
       // back into the hive metastore cache
@@ -235,7 +235,7 @@ case class AlterTableAddColumnsCommand(
     val colsWithProcessedDefaults =
       constantFoldCurrentDefaultsToExistDefaults(sparkSession, catalogTable.provider)
 
-    CommandUtils.uncacheTableOrView(sparkSession, table.quotedString)
+    CommandUtils.uncacheTableOrView(sparkSession, table)
     catalog.refreshTable(table)
 
     SchemaUtils.checkColumnNameDuplication(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 3a761541a00e..d71d0d43683c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -155,7 +155,7 @@ case class CreateViewCommand(
 
         // uncache the cached data before replacing an exists view
         logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.")
-        CommandUtils.uncacheTableOrView(sparkSession, viewIdent.quotedString)
+        CommandUtils.uncacheTableOrView(sparkSession, viewIdent)
 
         // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
         // Nothing we need to retain from the old view, so just drop and create a new one
@@ -298,7 +298,7 @@ case class AlterViewAsCommand(
     checkCyclicViewReference(analyzedPlan, Seq(viewIdent), viewIdent)
 
     logDebug(s"Try to uncache ${viewIdent.quotedString} before replacing.")
-    CommandUtils.uncacheTableOrView(session, viewIdent.quotedString)
+    CommandUtils.uncacheTableOrView(session, viewIdent)
 
     val newProperties = generateViewProperties(
       viewMeta.properties, session, analyzedPlan, analyzedPlan.schema.fieldNames)
@@ -667,7 +667,7 @@ object ViewHelper extends SQLConfHelper with Logging {
         // view is already converted to the underlying tables. So no cyclic views.
         checkCyclicViewReference(analyzedPlan, Seq(name), name)
       }
-      CommandUtils.uncacheTableOrView(session, name.quotedString)
+      CommandUtils.uncacheTableOrView(session, name)
     }
     if (!storeAnalyzedPlanForView) {
       TemporaryViewRelation(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 30d05350aa72..d33ecea6e0db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -41,6 +41,7 @@ import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBat
 import org.apache.spark.sql.connector.write.V1Write
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
 import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation, PushableColumnAndNestedColumn}
 import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
 import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
@@ -348,10 +349,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       }
 
     case DropTable(r: ResolvedIdentifier, ifExists, purge) =>
-      val invalidateFunc = () => session.sharedState.cacheManager.uncacheTableOrView(
-        session,
-        (r.catalog.name() +: r.identifier.namespace() :+ r.identifier.name()).toImmutableArraySeq,
-        cascade = true)
+      val invalidateFunc = () => CommandUtils.uncacheTableOrView(session, r)
       DropTableExec(r.catalog.asTableCatalog, r.identifier, ifExists, purge, invalidateFunc) :: Nil
 
     case _: NoopCommand =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 74d131d6664f..4a92bfd84040 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -108,7 +108,7 @@ case class InsertIntoHiveTable(
     }
 
     // un-cache this table.
-    CommandUtils.uncacheTableOrView(sparkSession, table.identifier.quotedString)
+    CommandUtils.uncacheTableOrView(sparkSession, table.identifier)
     sparkSession.sessionState.catalog.refreshTable(table.identifier)
 
     CommandUtils.updateTableStats(sparkSession, table)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org