You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ya...@apache.org on 2019/01/29 12:34:04 UTC
[spark] branch branch-2.4 updated: [SPARK-26708][SQL][BRANCH-2.4]
Incorrect result caused by inconsistency between a SQL cache's cached RDD
and its physical plan
This is an automated email from the ASF dual-hosted git repository.
yamamuro pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new d5cc890 [SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan
d5cc890 is described below
commit d5cc8909c72e958ce187df9c75847ad0125991ab
Author: maryannxue <ma...@apache.org>
AuthorDate: Tue Jan 29 21:33:46 2019 +0900
[SPARK-26708][SQL][BRANCH-2.4] Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan
## What changes were proposed in this pull request?
When performing non-cascading cache invalidation, `recache` is called on the other cache entries which are dependent on the cache being invalidated. It leads to the the physical plans of those cache entries being re-compiled. For those cache entries, if the cache RDD has already been persisted, chances are there will be inconsistency between the data and the new plan. It can cause a correctness issue if the new plan's `outputPartitioning` or `outputOrdering` is different from the tha [...]
The fix is to keep the cache entry as it is if the data has been loaded, otherwise re-build the cache entry, with a new plan and an empty cache buffer.
## How was this patch tested?
Added UT.
Closes #23678 from maryannxue/spark-26708-2.4.
Authored-by: maryannxue <ma...@apache.org>
Signed-off-by: Takeshi Yamamuro <ya...@apache.org>
---
.../apache/spark/sql/execution/CacheManager.scala | 28 +++++++++++---
.../sql/execution/columnar/InMemoryRelation.scala | 10 +----
.../org/apache/spark/sql/DatasetCacheSuite.scala | 44 +++++++++++++++++++++-
3 files changed, 67 insertions(+), 15 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index c992993..5b30596 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -166,16 +166,34 @@ class CacheManager extends Logging {
val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
while (it.hasNext) {
val cd = it.next()
- if (condition(cd.plan)) {
- if (clearCache) {
- cd.cachedRepresentation.cacheBuilder.clearCache()
- }
+ // If `clearCache` is false (which means the recache request comes from a non-cascading
+ // cache invalidation) and the cache buffer has already been loaded, we do not need to
+ // re-compile a physical plan because the old plan will not be used any more by the
+ // CacheManager although it still lives in compiled `Dataset`s and it could still work.
+ // Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer
+ // and re-compile the physical plan; or it is a non-cascading cache invalidation and cache
+ // buffer is still empty, then we could have a more efficient new plan by removing
+ // dependency on the previously removed cache entries.
+ // Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking
+ // status test and may not return the most accurate cache buffer state. So the worse case
+ // scenario can be:
+ // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we
+ // will clear the buffer and build a new plan. It is inefficient but doesn't affect
+ // correctness.
+ // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we
+ // will keep it as it is. It means the physical plan has been re-compiled already in the
+ // other thread.
+ val buildNewPlan =
+ clearCache || !cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
+ if (condition(cd.plan) && buildNewPlan) {
+ cd.cachedRepresentation.cacheBuilder.clearCache()
// Remove the cache entry before we create a new one, so that we can have a different
// physical plan.
it.remove()
val plan = spark.sessionState.executePlan(cd.plan).executedPlan
val newCache = InMemoryRelation(
- cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
+ cacheBuilder = cd.cachedRepresentation
+ .cacheBuilder.copy(cachedPlan = plan)(_cachedColumnBuffers = null),
logicalPlan = cd.plan)
needToRecache += cd.copy(cachedRepresentation = newCache)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index b752b77..8eecd7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -74,14 +74,8 @@ case class CachedRDDBuilder(
}
}
- def withCachedPlan(cachedPlan: SparkPlan): CachedRDDBuilder = {
- new CachedRDDBuilder(
- useCompression,
- batchSize,
- storageLevel,
- cachedPlan = cachedPlan,
- tableName
- )(_cachedColumnBuffers)
+ def isCachedColumnBuffersLoaded: Boolean = {
+ _cachedColumnBuffers != null
}
private def buildBuffers(): RDD[CachedBatch] = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index 5c6a021..7c97f5c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -190,9 +190,9 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
df1.unpersist(blocking = true)
- // df1 un-cached; df2's cache plan re-compiled
+ // df1 un-cached; df2's cache plan stays the same
assert(df1.storageLevel == StorageLevel.NONE)
- assertCacheDependency(df1.groupBy('a).agg(sum('b)), 0)
+ assertCacheDependency(df1.groupBy('a).agg(sum('b)))
val df4 = df1.groupBy('a).agg(sum('b)).agg(sum("sum(b)"))
assertCached(df4)
@@ -206,4 +206,44 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
// first time use, load cache
checkDataset(df5, Row(10))
}
+
+ test("SPARK-26708 Cache data and cached plan should stay consistent") {
+ val df = spark.range(0, 5).toDF("a")
+ val df1 = df.withColumn("b", 'a + 1)
+ val df2 = df.filter('a > 1)
+
+ df.cache()
+ // Add df1 to the CacheManager; the buffer is currently empty.
+ df1.cache()
+ // After calling collect(), df1's buffer has been loaded.
+ df1.collect()
+ // Add df2 to the CacheManager; the buffer is currently empty.
+ df2.cache()
+
+ // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan.
+ assertCacheDependency(df1)
+ val df1InnerPlan = df1.queryExecution.withCachedData
+ .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
+ // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan.
+ assertCacheDependency(df2)
+
+ df.unpersist(blocking = true)
+
+ // Verify that df1's cache has stayed the same, since df1's cache already has data
+ // before df.unpersist().
+ val df1Limit = df1.limit(2)
+ val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst {
+ case i: InMemoryRelation => i.cacheBuilder.cachedPlan
+ }
+ assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)
+
+ // Verify that df2's cache has been re-cached, with a new physical plan rid of dependency
+ // on df, since df2's cache had not been loaded before df.unpersist().
+ val df2Limit = df2.limit(2)
+ val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst {
+ case i: InMemoryRelation => i.cacheBuilder.cachedPlan
+ }
+ assert(df2LimitInnerPlan.isDefined &&
+ df2LimitInnerPlan.get.find(_.isInstanceOf[InMemoryTableScanExec]).isEmpty)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org