You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/01/24 02:49:17 UTC

[spark] branch master updated: [SPARK-26617][SQL] Cache manager locks

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d0e9219  [SPARK-26617][SQL] Cache manager locks
d0e9219 is described below

commit d0e9219e03373b0db918d51bafe6a97775e2c65f
Author: Dave DeCaprio <da...@alum.mit.edu>
AuthorDate: Thu Jan 24 10:48:48 2019 +0800

    [SPARK-26617][SQL] Cache manager locks
    
    ## What changes were proposed in this pull request?
    
    Fixed several places in CacheManager where a write lock was being held while running the query optimizer.  This could cause a very lock block if the query optimization takes a long time.  This builds on changes from [SPARK-26548] that fixed this issue for one specific case in the CacheManager.
    
    gatorsmile This is very similar to the PR you approved last week.
    
    ## How was this patch tested?
    
    Has been tested on a live system where the blocking was causing major issues and it is working well.
    CacheManager has no explicit unit test but is used in many places internally as part of the SharedState.
    
    Closes #23539 from DaveDeCaprio/cache-manager-locks.
    
    Lead-authored-by: Dave DeCaprio <da...@alum.mit.edu>
    Co-authored-by: David DeCaprio <da...@alum.mit.edu>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/execution/CacheManager.scala  | 69 ++++++++++++++--------
 1 file changed, 43 insertions(+), 26 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 728fde5..0e6627c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 
@@ -120,7 +121,7 @@ class CacheManager extends Logging {
   def uncacheQuery(
       query: Dataset[_],
       cascade: Boolean,
-      blocking: Boolean = true): Unit = writeLock {
+      blocking: Boolean = true): Unit = {
     uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
   }
 
@@ -136,21 +137,27 @@ class CacheManager extends Logging {
       spark: SparkSession,
       plan: LogicalPlan,
       cascade: Boolean,
-      blocking: Boolean): Unit = writeLock {
+      blocking: Boolean): Unit = {
     val shouldRemove: LogicalPlan => Boolean =
       if (cascade) {
         _.find(_.sameResult(plan)).isDefined
       } else {
         _.sameResult(plan)
       }
-    val it = cachedData.iterator()
-    while (it.hasNext) {
-      val cd = it.next()
-      if (shouldRemove(cd.plan)) {
-        cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
-        it.remove()
+    val plansToUncache = mutable.Buffer[CachedData]()
+    writeLock {
+      val it = cachedData.iterator()
+      while (it.hasNext) {
+        val cd = it.next()
+        if (shouldRemove(cd.plan)) {
+          plansToUncache += cd
+          it.remove()
+        }
       }
     }
+    plansToUncache.foreach { cd =>
+      cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
+    }
     // Re-compile dependent cached queries after removing the cached query.
     if (!cascade) {
       recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined, clearCache = false)
@@ -160,7 +167,7 @@ class CacheManager extends Logging {
   /**
    * Tries to re-cache all the cache entries that refer to the given plan.
    */
-  def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = writeLock {
+  def recacheByPlan(spark: SparkSession, plan: LogicalPlan): Unit = {
     recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined)
   }
 
@@ -168,26 +175,36 @@ class CacheManager extends Logging {
       spark: SparkSession,
       condition: LogicalPlan => Boolean,
       clearCache: Boolean = true): Unit = {
-    val it = cachedData.iterator()
     val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
-    while (it.hasNext) {
-      val cd = it.next()
-      if (condition(cd.plan)) {
-        if (clearCache) {
-          cd.cachedRepresentation.cacheBuilder.clearCache()
+    writeLock {
+      val it = cachedData.iterator()
+      while (it.hasNext) {
+        val cd = it.next()
+        if (condition(cd.plan)) {
+          needToRecache += cd
+          // Remove the cache entry before we create a new one, so that we can have a different
+          // physical plan.
+          it.remove()
+        }
+      }
+    }
+    needToRecache.map { cd =>
+      if (clearCache) {
+        cd.cachedRepresentation.cacheBuilder.clearCache()
+      }
+      val plan = spark.sessionState.executePlan(cd.plan).executedPlan
+      val newCache = InMemoryRelation(
+        cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
+        logicalPlan = cd.plan)
+      val recomputedPlan = cd.copy(cachedRepresentation = newCache)
+      writeLock {
+        if (lookupCachedData(recomputedPlan.plan).nonEmpty) {
+          logWarning("While recaching, data was already added to cache.")
+        } else {
+          cachedData.add(recomputedPlan)
         }
-        // Remove the cache entry before we create a new one, so that we can have a different
-        // physical plan.
-        it.remove()
-        val plan = spark.sessionState.executePlan(cd.plan).executedPlan
-        val newCache = InMemoryRelation(
-          cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
-          logicalPlan = cd.plan)
-        needToRecache += cd.copy(cachedRepresentation = newCache)
       }
     }
-
-    needToRecache.foreach(cachedData.add)
   }
 
   /** Optionally returns cached data for the given [[Dataset]] */
@@ -225,7 +242,7 @@ class CacheManager extends Logging {
    * Tries to re-cache all the cache entries that contain `resourcePath` in one or more
    * `HadoopFsRelation` node(s) as part of its logical plan.
    */
-  def recacheByPath(spark: SparkSession, resourcePath: String): Unit = writeLock {
+  def recacheByPath(spark: SparkSession, resourcePath: String): Unit = {
     val (fs, qualifiedPath) = {
       val path = new Path(resourcePath)
       val fs = path.getFileSystem(spark.sessionState.newHadoopConf())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org