You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/10/25 23:22:31 UTC

[spark] branch branch-3.0 updated: [SPARK-33228][SQL] Don't uncache data when replacing a view having the same logical plan

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 80716d1  [SPARK-33228][SQL] Don't uncache data when replacing a view having the same logical plan
80716d1 is described below

commit 80716d1a216da12bba0ceeab8c11895d37d5559b
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Sun Oct 25 16:15:55 2020 -0700

    [SPARK-33228][SQL] Don't uncache data when replacing a view having the same logical plan
    
    ### What changes were proposed in this pull request?
    
    SPARK-30494's updated the `CreateViewCommand` code to implicitly drop cache when replacing an existing view. But, this change drops cache even when replacing a view having the same logical plan. A sequence of queries to reproduce this as follows;
    ```
    // Spark v2.4.6+
    scala> val df = spark.range(1).selectExpr("id a", "id b")
    scala> df.cache()
    scala> df.explain()
    == Physical Plan ==
    *(1) ColumnarToRow
    +- InMemoryTableScan [a#2L, b#3L]
          +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
                +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
                   +- *(1) Range (0, 1, step=1, splits=4)
    
    scala> df.createOrReplaceTempView("t")
    scala> sql("select * from t").explain()
    == Physical Plan ==
    *(1) ColumnarToRow
    +- InMemoryTableScan [a#2L, b#3L]
          +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
                +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
                   +- *(1) Range (0, 1, step=1, splits=4)
    
    // If one re-runs the same query `df.createOrReplaceTempView("t")`, the cache's swept away
    scala> df.createOrReplaceTempView("t")
    scala> sql("select * from t").explain()
    == Physical Plan ==
    *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
    +- *(1) Range (0, 1, step=1, splits=4)
    
    // Until v2.4.6
    scala> val df = spark.range(1).selectExpr("id a", "id b")
    scala> df.cache()
    scala> df.createOrReplaceTempView("t")
    scala> sql("select * from t").explain()
    20/10/23 22:33:42 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    == Physical Plan ==
    *(1) InMemoryTableScan [a#2L, b#3L]
       +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
             +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
                +- *(1) Range (0, 1, step=1, splits=4)
    
    scala> df.createOrReplaceTempView("t")
    scala> sql("select * from t").explain()
    == Physical Plan ==
    *(1) InMemoryTableScan [a#2L, b#3L]
       +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas)
             +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
                +- *(1) Range (0, 1, step=1, splits=4)
    ```
    
    ### Why are the changes needed?
    
    bugfix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added tests.
    
    Closes #30140 from maropu/FixBugInReplaceView.
    
    Authored-by: Takeshi Yamamuro <ya...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 87b498462b82fce02dd50286887092cf7858d2e8)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/execution/command/views.scala | 10 +++++----
 .../org/apache/spark/sql/CachedTableSuite.scala    | 24 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 23f1d6c..0ba76ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -110,17 +110,19 @@ case class CreateViewCommand(
     verifyTemporaryObjectsNotExists(catalog)
 
     if (viewType == LocalTempView) {
-      if (replace && catalog.getTempView(name.table).isDefined) {
-        logDebug(s"Try to uncache ${name.quotedString} before replacing.")
+      if (replace && catalog.getTempView(name.table).isDefined &&
+          !catalog.getTempView(name.table).get.sameResult(child)) {
+        logInfo(s"Try to uncache ${name.quotedString} before replacing.")
         CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
       }
       val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
       catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace)
     } else if (viewType == GlobalTempView) {
-      if (replace && catalog.getGlobalTempView(name.table).isDefined) {
+      if (replace && catalog.getGlobalTempView(name.table).isDefined &&
+          !catalog.getGlobalTempView(name.table).get.sameResult(child)) {
         val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
         val globalTempView = TableIdentifier(name.table, Option(db))
-        logDebug(s"Try to uncache ${globalTempView.quotedString} before replacing.")
+        logInfo(s"Try to uncache ${globalTempView.quotedString} before replacing.")
         CommandUtils.uncacheTableOrView(sparkSession, globalTempView.quotedString)
       }
       val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 20f2a7f..adc725e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -1184,4 +1184,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
       assert(spark.sharedState.cacheManager.isEmpty)
     }
   }
+
+  test("SPARK-33228: Don't uncache data when replacing an existing view having the same plan") {
+    withTempView("tempView") {
+      spark.catalog.clearCache()
+      val df = spark.range(1).selectExpr("id a", "id b")
+      df.cache()
+      assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+      df.createOrReplaceTempView("tempView")
+      assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+      df.createOrReplaceTempView("tempView")
+      assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+    }
+
+    withTempView("tempGlobalTempView") {
+      spark.catalog.clearCache()
+      val df = spark.range(1).selectExpr("id a", "id b")
+      df.cache()
+      assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+      df.createOrReplaceGlobalTempView("tempGlobalTempView")
+      assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+      df.createOrReplaceGlobalTempView("tempGlobalTempView")
+      assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org