You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/10/31 17:02:08 UTC

[spark] branch branch-3.0 updated: [SPARK-33290][SQL] REFRESH TABLE should invalidate cache even though the table itself may not be cached

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fc10531  [SPARK-33290][SQL] REFRESH TABLE should invalidate cache even though the table itself may not be cached
fc10531 is described below

commit fc105310d57f8186680320f3d460638c3a1fbb70
Author: Chao Sun <su...@apple.com>
AuthorDate: Sat Oct 31 09:49:18 2020 -0700

    [SPARK-33290][SQL] REFRESH TABLE should invalidate cache even though the table itself may not be cached
    
    ### What changes were proposed in this pull request?
    
    In `CatalogImpl.refreshTable`, this moves the `uncacheQuery` call out of the condition `if (cache.nonEmpty)` so that it will be called whether the table itself is cached or not.
    
    ### Why are the changes needed?
    
    In the case like the following:
    ```sql
    CREATE TABLE t ...;
    CREATE VIEW t1 AS SELECT * FROM t;
    REFRESH TABLE t;
    ```
    
    If the table `t` is refreshed, the view `t1` which is depending on `t` will not be invalidated. This could lead to incorrect result and is similar to [SPARK-19765](https://issues.apache.org/jira/browse/SPARK-19765).
    
    On the other hand, if we have:
    
    ```sql
    CREATE TABLE t ...;
    CACHE TABLE t;
    CREATE VIEW t1 AS SELECT * FROM t;
    REFRESH TABLE t;
    ```
    
    Then the view `t1` will be refreshed. The behavior is somewhat inconsistent.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, with the change any cache that are depending on the table refreshed will be invalidated with the change. Previously this only happens if the table itself is cached.
    
    ### How was this patch tested?
    
    Added a new UT for the case.
    
    Closes #30187 from sunchao/SPARK-33290.
    
    Authored-by: Chao Sun <su...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 32b78d3795d5c4fd533b0267647977ed4f02ee49)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/internal/CatalogImpl.scala    | 12 +++++--
 .../org/apache/spark/sql/CachedTableSuite.scala    | 42 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 7ca9fbb..7c76168 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -464,6 +464,9 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
    * If this table is cached as an InMemoryRelation, drop the original cached version and make the
    * new version cached lazily.
    *
+   * In addition, refreshing a table also invalidate all caches that have reference to the table
+   * in a cascading manner. This is to prevent incorrect result from the otherwise staled caches.
+   *
    * @group cachemgmt
    * @since 2.0.0
    */
@@ -484,14 +487,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
     // If this table is cached as an InMemoryRelation, drop the original
     // cached version and make the new version cached lazily.
     val cache = sparkSession.sharedState.cacheManager.lookupCachedData(table)
+
+    // uncache the logical plan.
+    // note this is a no-op for the table itself if it's not cached, but will invalidate all
+    // caches referencing this table.
+    sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true)
+
     if (cache.nonEmpty) {
       // save the cache name and cache level for recreation
       val cacheName = cache.get.cachedRepresentation.cacheBuilder.tableName
       val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel
 
-      // uncache the logical plan.
-      sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true)
-
       // recache with the same name and cache level.
       sparkSession.sharedState.cacheManager.cacheQuery(table, cacheName, cacheLevel)
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index adc725e..6313370 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -1208,4 +1208,46 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
       assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
     }
   }
+
+  test("SPARK-33290: REFRESH TABLE should invalidate all caches referencing the table") {
+    withTable("t") {
+      withTempPath { path =>
+        withTempView("tempView1", "tempView2") {
+          Seq((1 -> "a")).toDF("i", "j").write.parquet(path.getCanonicalPath)
+          sql(s"CREATE TABLE t USING parquet LOCATION '${path.toURI}'")
+          sql("CREATE TEMPORARY VIEW tempView1 AS SELECT * FROM t")
+          sql("CACHE TABLE tempView2 AS SELECT i FROM tempView1")
+          checkAnswer(sql("SELECT * FROM tempView1"), Seq(Row(1, "a")))
+          checkAnswer(sql("SELECT * FROM tempView2"), Seq(Row(1)))
+
+          Utils.deleteRecursively(path)
+          sql("REFRESH TABLE tempView1")
+          checkAnswer(sql("SELECT * FROM tempView1"), Seq.empty)
+          checkAnswer(sql("SELECT * FROM tempView2"), Seq.empty)
+        }
+      }
+    }
+  }
+
+  test("SPARK-33290: querying temporary view after REFRESH TABLE fails with FNFE") {
+    withTable("t") {
+      withTempPath { path =>
+        withTempView("tempView1") {
+          Seq((1 -> "a")).toDF("i", "j").write.parquet(path.getCanonicalPath)
+          sql(s"CREATE TABLE t USING parquet LOCATION '${path.toURI}'")
+          sql("CREATE TEMPORARY VIEW tempView1 AS SELECT * FROM t")
+          checkAnswer(sql("SELECT * FROM tempView1"), Seq(Row(1, "a")))
+
+          Utils.deleteRecursively(path)
+          sql("REFRESH TABLE t")
+          checkAnswer(sql("SELECT * FROM t"), Seq.empty)
+          val exception = intercept[Exception] {
+            checkAnswer(sql("SELECT * FROM tempView1"), Seq.empty)
+          }
+          assert(exception.getMessage.contains("FileNotFoundException"))
+          assert(exception.getMessage.contains("REFRESH TABLE"))
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org