You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/01/10 03:45:36 UTC

[spark] branch branch-3.1 updated: [SPARK-34039][SQL][3.1] ReplaceTable should invalidate cache

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 5ed725c  [SPARK-34039][SQL][3.1] ReplaceTable should invalidate cache
5ed725c is described below

commit 5ed725cbd11856cc0641d5c7b536495952768b93
Author: Chao Sun <su...@apple.com>
AuthorDate: Sun Jan 10 12:45:01 2021 +0900

    [SPARK-34039][SQL][3.1] ReplaceTable should invalidate cache
    
    ### What changes were proposed in this pull request?
    
    This is a backport of #31081 to branch-3.1.
    
    This changes `ReplaceTableExec`/`AtomicReplaceTableExec`, and uncaches the target table before it is dropped. In addition, this includes some refactoring by moving the `uncacheTable` method to `DataSourceV2Strategy` so that we don't need to pass a Spark session to the v2 exec.
    
    ### Why are the changes needed?
    
    Similar to SPARK-33492 (#30429). When a table is refreshed, the associated cache should be invalidated to avoid potential incorrect results.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Now When a data source v2 is cached (either directly or indirectly), all the relevant caches will be refreshed or invalidated if the table is replaced.
    
    ### How was this patch tested?
    
    Added a new unit test.
    
    Closes #31100 from sunchao/SPARK-34039-branch-3.1.
    
    Authored-by: Chao Sun <su...@apple.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../datasources/v2/DataSourceV2Strategy.scala      | 21 ++++++++++++++-------
 .../datasources/v2/ReplaceTableExec.scala          | 14 +++++++++++---
 .../datasources/v2/WriteToDataSourceV2Exec.scala   | 22 ++++++----------------
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 17 +++++++++++++++++
 4 files changed, 48 insertions(+), 26 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 34bc80c..dffe0a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedPartit
 import org.apache.spark.sql.catalyst.expressions.{And, Expression, NamedExpression, PredicateHelper, SubqueryExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, TableCapability, TableCatalog, TableChange}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagingTableCatalog, SupportsNamespaces, SupportsPartitionManagement, Table, TableCapability, TableCatalog, TableChange}
 import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
 import org.apache.spark.sql.execution.{FilterExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.DataSourceStrategy
@@ -78,6 +78,11 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
     }
   }
 
+  private def invalidateCache(catalog: TableCatalog, table: Table, ident: Identifier): Unit = {
+    val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
+    session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
+  }
+
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
     case PhysicalOperation(project, filters,
         relation @ DataSourceV2ScanRelation(_, V1ScanWrapper(scan, translated, pushed), output)) =>
@@ -161,10 +166,12 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       catalog match {
         case staging: StagingTableCatalog =>
           AtomicReplaceTableExec(
-            staging, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
+            staging, ident, schema, parts, propsWithOwner, orCreate = orCreate,
+            invalidateCache) :: Nil
         case _ =>
           ReplaceTableExec(
-            catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate) :: Nil
+            catalog, ident, schema, parts, propsWithOwner, orCreate = orCreate,
+            invalidateCache) :: Nil
       }
 
     case ReplaceTableAsSelect(catalog, ident, parts, query, props, options, orCreate) =>
@@ -173,7 +180,6 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
       catalog match {
         case staging: StagingTableCatalog =>
           AtomicReplaceTableAsSelectExec(
-            session,
             staging,
             ident,
             parts,
@@ -181,10 +187,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
             planLater(query),
             propsWithOwner,
             writeOptions,
-            orCreate = orCreate) :: Nil
+            orCreate = orCreate,
+            invalidateCache) :: Nil
         case _ =>
           ReplaceTableAsSelectExec(
-            session,
             catalog,
             ident,
             parts,
@@ -192,7 +198,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
             planLater(query),
             propsWithOwner,
             writeOptions,
-            orCreate = orCreate) :: Nil
+            orCreate = orCreate,
+            invalidateCache) :: Nil
       }
 
     case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
index 1f3bcf2..10c09f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ReplaceTableExec.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException}
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
@@ -33,10 +33,13 @@ case class ReplaceTableExec(
     tableSchema: StructType,
     partitioning: Seq[Transform],
     tableProperties: Map[String, String],
-    orCreate: Boolean) extends V2CommandExec {
+    orCreate: Boolean,
+    invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends V2CommandExec {
 
   override protected def run(): Seq[InternalRow] = {
     if (catalog.tableExists(ident)) {
+      val table = catalog.loadTable(ident)
+      invalidateCache(catalog, table, ident)
       catalog.dropTable(ident)
     } else if (!orCreate) {
       throw new CannotReplaceMissingTableException(ident)
@@ -54,9 +57,14 @@ case class AtomicReplaceTableExec(
     tableSchema: StructType,
     partitioning: Seq[Transform],
     tableProperties: Map[String, String],
-    orCreate: Boolean) extends V2CommandExec {
+    orCreate: Boolean,
+    invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends V2CommandExec {
 
   override protected def run(): Seq[InternalRow] = {
+    if (catalog.tableExists(identifier)) {
+      val table = catalog.loadTable(identifier)
+      invalidateCache(catalog, table, identifier)
+    }
     val staged = if (orCreate) {
       catalog.stageCreateOrReplace(
         identifier, tableSchema, partitioning.toArray, tableProperties.asJava)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
index b0aff4a..a41b048 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala
@@ -26,7 +26,6 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.executor.CommitDeniedException
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, NoSuchTableException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -131,7 +130,6 @@ case class AtomicCreateTableAsSelectExec(
  * ReplaceTableAsSelectStagingExec.
  */
 case class ReplaceTableAsSelectExec(
-    session: SparkSession,
     catalog: TableCatalog,
     ident: Identifier,
     partitioning: Seq[Transform],
@@ -139,7 +137,8 @@ case class ReplaceTableAsSelectExec(
     query: SparkPlan,
     properties: Map[String, String],
     writeOptions: CaseInsensitiveStringMap,
-    orCreate: Boolean) extends TableWriteExecHelper {
+    orCreate: Boolean,
+    invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper {
 
   override protected def run(): Seq[InternalRow] = {
     // Note that this operation is potentially unsafe, but these are the strict semantics of
@@ -152,7 +151,7 @@ case class ReplaceTableAsSelectExec(
     // 3. The table returned by catalog.createTable doesn't support writing.
     if (catalog.tableExists(ident)) {
       val table = catalog.loadTable(ident)
-      uncacheTable(session, catalog, table, ident)
+      invalidateCache(catalog, table, ident)
       catalog.dropTable(ident)
     } else if (!orCreate) {
       throw new CannotReplaceMissingTableException(ident)
@@ -177,7 +176,6 @@ case class ReplaceTableAsSelectExec(
  * is left untouched.
  */
 case class AtomicReplaceTableAsSelectExec(
-    session: SparkSession,
     catalog: StagingTableCatalog,
     ident: Identifier,
     partitioning: Seq[Transform],
@@ -185,13 +183,14 @@ case class AtomicReplaceTableAsSelectExec(
     query: SparkPlan,
     properties: Map[String, String],
     writeOptions: CaseInsensitiveStringMap,
-    orCreate: Boolean) extends TableWriteExecHelper {
+    orCreate: Boolean,
+    invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper {
 
   override protected def run(): Seq[InternalRow] = {
     val schema = CharVarcharUtils.getRawSchema(query.schema).asNullable
     if (catalog.tableExists(ident)) {
       val table = catalog.loadTable(ident)
-      uncacheTable(session, catalog, table, ident)
+      invalidateCache(catalog, table, ident)
     }
     val staged = if (orCreate) {
       catalog.stageCreateOrReplace(
@@ -393,15 +392,6 @@ trait V2TableWriteExec extends V2CommandExec with UnaryExecNode {
 
     Nil
   }
-
-  protected def uncacheTable(
-      session: SparkSession,
-      catalog: TableCatalog,
-      table: Table,
-      ident: Identifier): Unit = {
-    val plan = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
-    session.sharedState.cacheManager.uncacheQuery(session, plan, cascade = true)
-  }
 }
 
 object DataWritingSparkTask extends Logging {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 92edd2b..f0f6e7c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -785,6 +785,23 @@ class DataSourceV2SQLSuite
     }
   }
 
+  test("SPARK-34039: ReplaceTable (atomic or non-atomic) should invalidate cache") {
+    Seq("testcat.ns.t", "testcat_atomic.ns.t").foreach { t =>
+      val view = "view"
+      withTable(t) {
+        withTempView(view) {
+          sql(s"CREATE TABLE $t USING foo AS SELECT id, data FROM source")
+          sql(s"CACHE TABLE $view AS SELECT id FROM $t")
+          checkAnswer(sql(s"SELECT * FROM $t"), spark.table("source"))
+          checkAnswer(sql(s"SELECT * FROM $view"), spark.table("source").select("id"))
+
+          sql(s"REPLACE TABLE $t (a bigint) USING foo")
+          assert(spark.sharedState.cacheManager.lookupCachedData(spark.table(view)).isEmpty)
+        }
+      }
+    }
+  }
+
   test("SPARK-33492: ReplaceTableAsSelect (atomic or non-atomic) should invalidate cache") {
     Seq("testcat.ns.t", "testcat_atomic.ns.t").foreach { t =>
       val view = "view"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org