You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2019/01/29 12:34:04 UTC

[spark] branch branch-2.4 updated: [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan

This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new d5cc890  [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan
d5cc890 is described below

commit d5cc8909c72e958ce187df9c75847ad0125991ab
Author: maryannxue <ma...@apache.org>
AuthorDate: Tue Jan 29 21:33:46 2019 +0900

    [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan
    
    ## What changes were proposed in this pull request?
    
    When performing non-cascading cache invalidation, `recache` is called on the other cache entries which are dependent on the cache being invalidated. It leads to the the physical plans of those cache entries being re-compiled. For those cache entries, if the cache RDD has already been persisted, chances are there will be inconsistency between the data and the new plan. It can cause a correctness issue if the new plan's `outputPartitioning`  or `outputOrdering` is different from the tha [...]
    
    The fix is to keep the cache entry as it is if the data has been loaded, otherwise re-build the cache entry, with a new plan and an empty cache buffer.
    
    ## How was this patch tested?
    
    Added UT.
    
    Closes #23678 from maryannxue/spark-26708-2.4.
    
    Authored-by: maryannxue <ma...@apache.org>
    Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
 .../apache/spark/sql/execution/CacheManager.scala  | 28 +++++++++++---
 .../sql/execution/columnar/InMemoryRelation.scala  | 10 +----
 .../org/apache/spark/sql/DatasetCacheSuite.scala   | 44 +++++++++++++++++++++-
 3 files changed, 67 insertions(+), 15 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index c992993..5b30596 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -166,16 +166,34 @@ class CacheManager extends Logging {
     val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
     while (it.hasNext) {
       val cd = it.next()
-      if (condition(cd.plan)) {
-        if (clearCache) {
-          cd.cachedRepresentation.cacheBuilder.clearCache()
-        }
+      // If `clearCache` is false (which means the recache request comes from a non-cascading
+      // cache invalidation) and the cache buffer has already been loaded, we do not need to
+      // re-compile a physical plan because the old plan will not be used any more by the
+      // CacheManager although it still lives in compiled `Dataset`s and it could still work.
+      // Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer
+      // and re-compile the physical plan; or it is a non-cascading cache invalidation and cache
+      // buffer is still empty, then we could have a more efficient new plan by removing
+      // dependency on the previously removed cache entries.
+      // Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking
+      // status test and may not return the most accurate cache buffer state. So the worse case
+      // scenario can be:
+      // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we
+      //    will clear the buffer and build a new plan. It is inefficient but doesn't affect
+      //    correctness.
+      // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we
+      //    will keep it as it is. It means the physical plan has been re-compiled already in the
+      //    other thread.
+      val buildNewPlan =
+        clearCache || !cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
+      if (condition(cd.plan) && buildNewPlan) {
+        cd.cachedRepresentation.cacheBuilder.clearCache()
         // Remove the cache entry before we create a new one, so that we can have a different
         // physical plan.
         it.remove()
         val plan = spark.sessionState.executePlan(cd.plan).executedPlan
         val newCache = InMemoryRelation(
-          cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
+          cacheBuilder = cd.cachedRepresentation
+            .cacheBuilder.copy(cachedPlan = plan)(_cachedColumnBuffers = null),
           logicalPlan = cd.plan)
         needToRecache += cd.copy(cachedRepresentation = newCache)
       }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index b752b77..8eecd7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -74,14 +74,8 @@ case class CachedRDDBuilder(
     }
   }
 
-  def withCachedPlan(cachedPlan: SparkPlan): CachedRDDBuilder = {
-    new CachedRDDBuilder(
-      useCompression,
-      batchSize,
-      storageLevel,
-      cachedPlan = cachedPlan,
-      tableName
-    )(_cachedColumnBuffers)
+  def isCachedColumnBuffersLoaded: Boolean = {
+    _cachedColumnBuffers != null
   }
 
   private def buildBuffers(): RDD[CachedBatch] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index 5c6a021..7c97f5c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -190,9 +190,9 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
 
     df1.unpersist(blocking = true)
 
-    // df1 un-cached; df2's cache plan re-compiled
+    // df1 un-cached; df2's cache plan stays the same
     assert(df1.storageLevel == StorageLevel.NONE)
-    assertCacheDependency(df1.groupBy('a).agg(sum('b)), 0)
+    assertCacheDependency(df1.groupBy('a).agg(sum('b)))
 
     val df4 = df1.groupBy('a).agg(sum('b)).agg(sum("sum(b)"))
     assertCached(df4)
@@ -206,4 +206,44 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
     // first time use, load cache
     checkDataset(df5, Row(10))
   }
+
+  test("SPARK-26708 Cache data and cached plan should stay consistent") {
+    val df = spark.range(0, 5).toDF("a")
+    val df1 = df.withColumn("b", 'a + 1)
+    val df2 = df.filter('a > 1)
+
+    df.cache()
+    // Add df1 to the CacheManager; the buffer is currently empty.
+    df1.cache()
+    // After calling collect(), df1's buffer has been loaded.
+    df1.collect()
+    // Add df2 to the CacheManager; the buffer is currently empty.
+    df2.cache()
+
+    // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan.
+    assertCacheDependency(df1)
+    val df1InnerPlan = df1.queryExecution.withCachedData
+      .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
+    // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan.
+    assertCacheDependency(df2)
+
+    df.unpersist(blocking = true)
+
+    // Verify that df1's cache has stayed the same, since df1's cache already has data
+    // before df.unpersist().
+    val df1Limit = df1.limit(2)
+    val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst {
+      case i: InMemoryRelation => i.cacheBuilder.cachedPlan
+    }
+    assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)
+
+    // Verify that df2's cache has been re-cached, with a new physical plan rid of dependency
+    // on df, since df2's cache had not been loaded before df.unpersist().
+    val df2Limit = df2.limit(2)
+    val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst {
+      case i: InMemoryRelation => i.cacheBuilder.cachedPlan
+    }
+    assert(df2LimitInnerPlan.isDefined &&
+      df2LimitInnerPlan.get.find(_.isInstanceOf[InMemoryTableScanExec]).isEmpty)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org