You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/01/06 03:20:50 UTC

[spark] branch master updated: [SPARK-26548][SQL] Don't hold CacheManager write lock while computing executedPlan

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a17851c  [SPARK-26548][SQL] Don't hold CacheManager write lock while computing executedPlan
a17851c is described below

commit a17851cb95687963936c4d4a7eed132ee2c10677
Author: Dave DeCaprio <da...@alum.mit.edu>
AuthorDate: Sat Jan 5 19:20:35 2019 -0800

    [SPARK-26548][SQL] Don't hold CacheManager write lock while computing executedPlan
    
    ## What changes were proposed in this pull request?
    
    Address SPARK-26548, in Spark 2.4.0, the CacheManager holds a write lock while computing the executedPlan for a cached logicalPlan.  In some cases with very large query plans this can be an expensive operation, taking minutes to run.  The entire cache is blocked during this time.  This PR changes that so the writeLock is only obtained after the executedPlan is generated, this reduces the time the lock is held to just the necessary time when the shared data structure is being updated.
    
    gatorsmile and cloud-fan - You can committed patches in this area before.  This is a small incremental change.
    
    ## How was this patch tested?
    
    Has been tested on a live system where the blocking was causing major issues and it is working well.
     CacheManager has no explicit unit test but is used in many places internally as part of the SharedState.
    
    Closes #23469 from DaveDeCaprio/optimizer-unblocked.
    
    Lead-authored-by: Dave DeCaprio <da...@alum.mit.edu>
    Co-authored-by: David DeCaprio <da...@alum.mit.edu>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../scala/org/apache/spark/sql/execution/CacheManager.scala    | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index c992993..728fde5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -88,7 +88,7 @@ class CacheManager extends Logging {
   def cacheQuery(
       query: Dataset[_],
       tableName: Option[String] = None,
-      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
+      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = {
     val planToCache = query.logicalPlan
     if (lookupCachedData(planToCache).nonEmpty) {
       logWarning("Asked to cache already cached data.")
@@ -100,7 +100,13 @@ class CacheManager extends Logging {
         sparkSession.sessionState.executePlan(planToCache).executedPlan,
         tableName,
         planToCache)
-      cachedData.add(CachedData(planToCache, inMemoryRelation))
+      writeLock {
+        if (lookupCachedData(planToCache).nonEmpty) {
+          logWarning("Data has already been cached.")
+        } else {
+          cachedData.add(CachedData(planToCache, inMemoryRelation))
+        }
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org