You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/01/31 03:05:02 UTC
[spark] branch master updated: [SPARK-26708][SQL][FOLLOWUP] put the
special handling of non-cascade uncache in the uncache method
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d8d2736 [SPARK-26708][SQL][FOLLOWUP] put the special handling of non-cascade uncache in the uncache method
d8d2736 is described below
commit d8d2736fd1e3cd47941153327ad50a4d36099476
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Thu Jan 31 11:04:33 2019 +0800
[SPARK-26708][SQL][FOLLOWUP] put the special handling of non-cascade uncache in the uncache method
## What changes were proposed in this pull request?
This is a follow up of https://github.com/apache/spark/pull/23644/files , to make these methods less coupled with each other.
## How was this patch tested?
existing tests
Closes #23687 from cloud-fan/cache.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/sql/execution/CacheManager.scala | 48 +++++++++++-----------
1 file changed, 23 insertions(+), 25 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 00c4461..398d7b4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -160,7 +160,22 @@ class CacheManager extends Logging {
}
// Re-compile dependent cached queries after removing the cached query.
if (!cascade) {
- recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined, clearCache = false)
+ recacheByCondition(spark, cd => {
+ // If the cache buffer has already been loaded, we don't need to recompile the cached plan,
+ // as it does not rely on the plan that has been uncached anymore, it will just produce
+ // data from the cache buffer.
+ // Note that the `CachedRDDBuilder.isCachedColumnBuffersLoaded` call is a non-locking
+ // status test and may not return the most accurate cache buffer state. So the worse case
+ // scenario can be:
+ // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we
+ // will clear the buffer and re-compiled the plan. It is inefficient but doesn't affect
+ // correctness.
+ // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we
+ // will keep it as it is. It means the physical plan has been re-compiled already in the
+ // other thread.
+ val cacheAlreadyLoaded = cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
+ cd.plan.find(_.sameResult(plan)).isDefined && !cacheAlreadyLoaded
+ })
}
}
@@ -168,38 +183,21 @@ class CacheManager extends Logging {
* Tries to re-cache all the cache entries that refer to the given plan.
*/
def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = {
- recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined)
+ recacheByCondition(spark, _.plan.find(_.sameResult(plan)).isDefined)
}
+ /**
+ * Re-caches all the cache entries that satisfies the given `condition`.
+ */
private def recacheByCondition(
spark: SparkSession,
- condition: LogicalPlan => Boolean,
- clearCache: Boolean = true): Unit = {
+ condition: CachedData => Boolean): Unit = {
val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
writeLock {
val it = cachedData.iterator()
while (it.hasNext) {
val cd = it.next()
- // If `clearCache` is false (which means the recache request comes from a non-cascading
- // cache invalidation) and the cache buffer has already been loaded, we do not need to
- // re-compile a physical plan because the old plan will not be used any more by the
- // CacheManager although it still lives in compiled `Dataset`s and it could still work.
- // Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer
- // and re-compile the physical plan; or it is a non-cascading cache invalidation and cache
- // buffer is still empty, then we could have a more efficient new plan by removing
- // dependency on the previously removed cache entries.
- // Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking
- // status test and may not return the most accurate cache buffer state. So the worse case
- // scenario can be:
- // 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we
- // will clear the buffer and build a new plan. It is inefficient but doesn't affect
- // correctness.
- // 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we
- // will keep it as it is. It means the physical plan has been re-compiled already in the
- // other thread.
- val buildNewPlan =
- clearCache || !cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
- if (condition(cd.plan) && buildNewPlan) {
+ if (condition(cd)) {
needToRecache += cd
// Remove the cache entry before we create a new one, so that we can have a different
// physical plan.
@@ -267,7 +265,7 @@ class CacheManager extends Logging {
(fs, fs.makeQualified(path))
}
- recacheByCondition(spark, _.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
+ recacheByCondition(spark, _.plan.find(lookupAndRefresh(_, fs, qualifiedPath)).isDefined)
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org