You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/01/27 19:39:37 UTC

[spark] branch master updated: [SPARK-26708][SQL] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ce7e7df  [SPARK-26708][SQL] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan
ce7e7df is described below

commit ce7e7df99d9b33eaabee17c4f6216d2e12848eaf
Author: maryannxue <ma...@apache.org>
AuthorDate: Sun Jan 27 11:39:27 2019 -0800

    [SPARK-26708][SQL] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan
    
    ## What changes were proposed in this pull request?
    
    When performing non-cascading cache invalidation, `recache` is called on the other cache entries which are dependent on the cache being invalidated. It leads to the the physical plans of those cache entries being re-compiled. For those cache entries, if the cache RDD has already been persisted, chances are there will be inconsistency between the data and the new plan. It can cause a correctness issue if the new plan's `outputPartitioning`  or `outputOrdering` is different from the tha [...]
    
    The fix is to keep the cache entry as it is if the data has been loaded, otherwise re-build the cache entry, with a new plan and an empty cache buffer.
    
    ## How was this patch tested?
    
    Added UT.
    
    Closes #23644 from maryannxue/spark-26708.
    
    Lead-authored-by: maryannxue <ma...@apache.org>
    Co-authored-by: Xiao Li <ga...@gmail.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../apache/spark/sql/execution/CacheManager.scala  | 28 +++++++++++---
 .../sql/execution/columnar/InMemoryRelation.scala  | 10 +----
 .../org/apache/spark/sql/DatasetCacheSuite.scala   | 44 +++++++++++++++++++++-
 3 files changed, 67 insertions(+), 15 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 0e6627c..00c4461 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -180,7 +180,26 @@ class CacheManager extends Logging {
       val it = cachedData.iterator()
       while (it.hasNext) {
         val cd = it.next()
-        if (condition(cd.plan)) {
+        // If `clearCache` is false (which means the recache request comes from a non-cascading
+        // cache invalidation) and the cache buffer has already been loaded, we do not need to
+        // re-compile a physical plan because the old plan will not be used any more by the
+        // CacheManager although it still lives in compiled `Dataset`s and it could still work.
+        // Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer
+        // and re-compile the physical plan; or it is a non-cascading cache invalidation and cache
+        // buffer is still empty, then we could have a more efficient new plan by removing
+        // dependency on the previously removed cache entries.
+        // Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking
+        // status test and may not return the most accurate cache buffer state. So the worse case
+        // scenario can be:
+        // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we
+        //    will clear the buffer and build a new plan. It is inefficient but doesn't affect
+        //    correctness.
+        // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we
+        //    will keep it as it is. It means the physical plan has been re-compiled already in the
+        //    other thread.
+        val buildNewPlan =
+          clearCache || !cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
+        if (condition(cd.plan) && buildNewPlan) {
           needToRecache += cd
           // Remove the cache entry before we create a new one, so that we can have a different
           // physical plan.
@@ -189,12 +208,11 @@ class CacheManager extends Logging {
       }
     }
     needToRecache.map { cd =>
-      if (clearCache) {
-        cd.cachedRepresentation.cacheBuilder.clearCache()
-      }
+      cd.cachedRepresentation.cacheBuilder.clearCache()
       val plan = spark.sessionState.executePlan(cd.plan).executedPlan
       val newCache = InMemoryRelation(
-        cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
+        cacheBuilder = cd.cachedRepresentation
+          .cacheBuilder.copy(cachedPlan = plan)(_cachedColumnBuffers = null),
         logicalPlan = cd.plan)
       val recomputedPlan = cd.copy(cachedRepresentation = newCache)
       writeLock {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 41f406d..efdb82f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -75,14 +75,8 @@ case class CachedRDDBuilder(
     }
   }
 
-  def withCachedPlan(cachedPlan: SparkPlan): CachedRDDBuilder = {
-    new CachedRDDBuilder(
-      useCompression,
-      batchSize,
-      storageLevel,
-      cachedPlan = cachedPlan,
-      tableName
-    )(_cachedColumnBuffers)
+  def isCachedColumnBuffersLoaded: Boolean = {
+    _cachedColumnBuffers != null
   }
 
   private def buildBuffers(): RDD[CachedBatch] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index fef6ddd..1ff9f6d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -190,9 +190,9 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
 
     df1.unpersist(blocking = true)
 
-    // df1 un-cached; df2's cache plan re-compiled
+    // df1 un-cached; df2's cache plan stays the same
     assert(df1.storageLevel == StorageLevel.NONE)
-    assertCacheDependency(df1.groupBy('a).agg(sum('b)), 0)
+    assertCacheDependency(df1.groupBy('a).agg(sum('b)))
 
     val df4 = df1.groupBy('a).agg(sum('b)).agg(sum("sum(b)"))
     assertCached(df4)
@@ -206,4 +206,44 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
     // first time use, load cache
     checkDataset(df5, Row(10))
   }
+
+  test("SPARK-26708 Cache data and cached plan should stay consistent") {
+    val df = spark.range(0, 5).toDF("a")
+    val df1 = df.withColumn("b", 'a + 1)
+    val df2 = df.filter('a > 1)
+
+    df.cache()
+    // Add df1 to the CacheManager; the buffer is currently empty.
+    df1.cache()
+    // After calling collect(), df1's buffer has been loaded.
+    df1.collect()
+    // Add df2 to the CacheManager; the buffer is currently empty.
+    df2.cache()
+
+    // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan.
+    assertCacheDependency(df1)
+    val df1InnerPlan = df1.queryExecution.withCachedData
+      .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
+    // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan.
+    assertCacheDependency(df2)
+
+    df.unpersist(blocking = true)
+
+    // Verify that df1's cache has stayed the same, since df1's cache already has data
+    // before df.unpersist().
+    val df1Limit = df1.limit(2)
+    val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst {
+      case i: InMemoryRelation => i.cacheBuilder.cachedPlan
+    }
+    assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)
+
+    // Verify that df2's cache has been re-cached, with a new physical plan rid of dependency
+    // on df, since df2's cache had not been loaded before df.unpersist().
+    val df2Limit = df2.limit(2)
+    val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst {
+      case i: InMemoryRelation => i.cacheBuilder.cachedPlan
+    }
+    assert(df2LimitInnerPlan.isDefined &&
+      df2LimitInnerPlan.get.find(_.isInstanceOf[InMemoryTableScanExec]).isEmpty)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org