You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2019/03/15 02:13:58 UTC

[spark] branch master updated: [SPARK-26917][SQL] Further reduce locks in CacheManager

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8819eab  [SPARK-26917][SQL] Further reduce locks in CacheManager
8819eab is described below

commit 8819eaba4db9a20c60a423fe08bee82cb8595ad0
Author: Dave DeCaprio <da...@alum.mit.edu>
AuthorDate: Fri Mar 15 10:13:34 2019 +0800

    [SPARK-26917][SQL] Further reduce locks in CacheManager
    
    ## What changes were proposed in this pull request?
    
    Further load increases in our production environment have shown that even the read locks can cause some contention, since they contain a mechanism that turns a read lock into an exclusive lock if a writer has been starved out.  This PR reduces the potential for lock contention even further than https://github.com/apache/spark/pull/23833.  Additionally, it uses more idiomatic scala than the previous implementation.
    
    cloud-fan & gatorsmile This is a relatively minor improvement to the previous CacheManager changes.  At this point, I think we finally are doing the minimum possible amount of locking.
    
    ## How was this patch tested?
    
    Has been tested on a live system where the blocking was causing major issues and it is working well.
    CacheManager has no explicit unit test but is used in many places internally as part of the SharedState.
    
    Closes #24028 from DaveDeCaprio/read-locks-master.
    
    Lead-authored-by: Dave DeCaprio <da...@alum.mit.edu>
    Co-authored-by: David DeCaprio <da...@alum.mit.edu>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/sql/execution/CacheManager.scala  | 93 +++++++---------------
 1 file changed, 28 insertions(+), 65 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 2815a28..5a11a8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -19,8 +19,7 @@ package org.apache.spark.sql.execution
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import scala.collection.JavaConverters._
-import scala.collection.mutable
+import scala.collection.immutable.IndexedSeq
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 
@@ -46,38 +45,22 @@ case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation)
  */
 class CacheManager extends Logging {
 
-  @transient
-  private val cachedData = new java.util.LinkedList[CachedData]
-
-  @transient
-  private val cacheLock = new ReentrantReadWriteLock
-
-  /** Acquires a read lock on the cache for the duration of `f`. */
-  private def readLock[A](f: => A): A = {
-    val lock = cacheLock.readLock()
-    lock.lock()
-    try f finally {
-      lock.unlock()
-    }
-  }
-
-  /** Acquires a write lock on the cache for the duration of `f`. */
-  private def writeLock[A](f: => A): A = {
-    val lock = cacheLock.writeLock()
-    lock.lock()
-    try f finally {
-      lock.unlock()
-    }
-  }
+  /**
+   * Maintains the list of cached plans as an immutable sequence.  Any updates to the list
+   * should be protected in a "this.synchronized" block which includes the reading of the
+   * existing value and the update of the cachedData var.
+   */
+  @transient @volatile
+  private var cachedData = IndexedSeq[CachedData]()
 
   /** Clears all cached tables. */
-  def clearCache(): Unit = writeLock {
-    cachedData.asScala.foreach(_.cachedRepresentation.cacheBuilder.clearCache())
-    cachedData.clear()
+  def clearCache(): Unit = this.synchronized {
+    cachedData.foreach(_.cachedRepresentation.cacheBuilder.clearCache())
+    cachedData = IndexedSeq[CachedData]()
   }
 
   /** Checks if the cache is empty. */
-  def isEmpty: Boolean = readLock {
+  def isEmpty: Boolean = {
     cachedData.isEmpty
   }
 
@@ -101,11 +84,11 @@ class CacheManager extends Logging {
         sparkSession.sessionState.executePlan(planToCache).executedPlan,
         tableName,
         planToCache)
-      writeLock {
+      this.synchronized {
         if (lookupCachedData(planToCache).nonEmpty) {
           logWarning("Data has already been cached.")
         } else {
-          cachedData.add(CachedData(planToCache, inMemoryRelation))
+          cachedData = CachedData(planToCache, inMemoryRelation) +: cachedData
         }
       }
     }
@@ -144,22 +127,12 @@ class CacheManager extends Logging {
       } else {
         _.sameResult(plan)
       }
-    val plansToUncache = mutable.Buffer[CachedData]()
-    readLock {
-      val it = cachedData.iterator()
-      while (it.hasNext) {
-        val cd = it.next()
-        if (shouldRemove(cd.plan)) {
-          plansToUncache += cd
-        }
-      }
-    }
-    plansToUncache.foreach { cd =>
-      writeLock {
-        cachedData.remove(cd)
-      }
-      cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
+    val plansToUncache = cachedData.filter(cd => shouldRemove(cd.plan))
+    this.synchronized {
+      cachedData = cachedData.filterNot(cd => plansToUncache.exists(_ eq cd))
     }
+    plansToUncache.foreach { _.cachedRepresentation.cacheBuilder.clearCache(blocking) }
+
     // Re-compile dependent cached queries after removing the cached query.
     if (!cascade) {
       recacheByCondition(spark, cd => {
@@ -194,46 +167,36 @@ class CacheManager extends Logging {
   private def recacheByCondition(
       spark: SparkSession,
       condition: CachedData => Boolean): Unit = {
-    val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
-    readLock {
-      val it = cachedData.iterator()
-      while (it.hasNext) {
-        val cd = it.next()
-        if (condition(cd)) {
-          needToRecache += cd
-        }
-      }
+    val needToRecache = cachedData.filter(condition)
+    this.synchronized {
+      // Remove the cache entry before creating a new ones.
+      cachedData = cachedData.filterNot(cd => needToRecache.exists(_ eq cd))
     }
     needToRecache.map { cd =>
-      writeLock {
-        // Remove the cache entry before we create a new one, so that we can have a different
-        // physical plan.
-        cachedData.remove(cd)
-      }
       cd.cachedRepresentation.cacheBuilder.clearCache()
       val plan = spark.sessionState.executePlan(cd.plan).executedPlan
       val newCache = InMemoryRelation(
         cacheBuilder = cd.cachedRepresentation.cacheBuilder.copy(cachedPlan = plan),
         logicalPlan = cd.plan)
       val recomputedPlan = cd.copy(cachedRepresentation = newCache)
-      writeLock {
+      this.synchronized {
         if (lookupCachedData(recomputedPlan.plan).nonEmpty) {
           logWarning("While recaching, data was already added to cache.")
         } else {
-          cachedData.add(recomputedPlan)
+          cachedData = recomputedPlan +: cachedData
         }
       }
     }
   }
 
   /** Optionally returns cached data for the given [[Dataset]] */
-  def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock {
+  def lookupCachedData(query: Dataset[_]): Option[CachedData] = {
     lookupCachedData(query.logicalPlan)
   }
 
   /** Optionally returns cached data for the given [[LogicalPlan]]. */
-  def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock {
-    cachedData.asScala.find(cd => plan.sameResult(cd.plan))
+  def lookupCachedData(plan: LogicalPlan): Option[CachedData] = {
+    cachedData.find(cd => plan.sameResult(cd.plan))
   }
 
   /** Replaces segments of the given logical plan with cached versions where possible. */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org