You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/06/07 06:13:29 UTC

spark git commit: [SPARK-7955] [CORE] Ensure executors with cached RDD blocks are not re…

Repository: spark
Updated Branches:
  refs/heads/master ed2cc3ee8 -> 3285a5112


[SPARK-7955] [CORE] Ensure executors with cached RDD blocks are not re…

…moved if dynamic allocation is enabled.

This is a work in progress. This patch ensures that an executor that has cached RDD blocks are not removed,
but makes no attempt to find another executor to remove. This is meant to get some feedback on the current
approach, and if it makes sense then I will look at choosing another executor to remove. No testing has been done either.

Author: Hari Shreedharan <hs...@apache.org>

Closes #6508 from harishreedharan/dymanic-caching and squashes the following commits:

dddf1eb [Hari Shreedharan] Minor configuration description update.
10130e2 [Hari Shreedharan] Fix compile issue.
5417b53 [Hari Shreedharan] Add documentation for new config. Remove block from cachedBlocks when it is dropped.
875916a [Hari Shreedharan] Make some code more readable.
39940ca [Hari Shreedharan] Handle the case where the executor has not yet registered.
90ad711 [Hari Shreedharan] Remove unused imports and unused methods.
063985c [Hari Shreedharan] Send correct message instead of recursively calling same method.
ec2fd7e [Hari Shreedharan] Add file missed in last commit
5d10fad [Hari Shreedharan] Update cached blocks status using local info, rather than doing an RPC.
193af4c [Hari Shreedharan] WIP. Use local state rather than via RPC.
ae932ff [Hari Shreedharan] Fix config param name.
272969d [Hari Shreedharan] Fix seconds to millis bug.
5a1993f [Hari Shreedharan] Add timeout for cache executors. Ignore broadcast blocks while checking if there are cached blocks.
57fefc2 [Hari Shreedharan] [SPARK-7955][Core] Ensure executors with cached RDD blocks are not removed if dynamic allocation is enabled.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3285a511
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3285a511
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3285a511

Branch: refs/heads/master
Commit: 3285a51121397bfd2e62dbee8e1f0fa7c72512a7
Parents: ed2cc3e
Author: Hari Shreedharan <hs...@apache.org>
Authored: Sat Jun 6 21:13:26 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Sat Jun 6 21:13:26 2015 -0700

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 21 +++++++++++--
 .../spark/storage/BlockManagerMaster.scala      |  8 +++++
 .../storage/BlockManagerMasterEndpoint.scala    | 33 ++++++++++++++++++--
 .../spark/storage/BlockManagerMessages.scala    |  3 +-
 docs/configuration.md                           |  9 ++++++
 5 files changed, 68 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3285a511/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 9939103..4932942 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -101,6 +101,9 @@ private[spark] class ExecutorAllocationManager(
   private val executorIdleTimeoutS = conf.getTimeAsSeconds(
     "spark.dynamicAllocation.executorIdleTimeout", "60s")
 
+  private val cachedExecutorIdleTimeoutS = conf.getTimeAsSeconds(
+    "spark.dynamicAllocation.cachedExecutorIdleTimeout", s"${2 * executorIdleTimeoutS}s")
+
   // During testing, the methods to actually kill and add executors are mocked out
   private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)
 
@@ -459,9 +462,23 @@ private[spark] class ExecutorAllocationManager(
   private def onExecutorIdle(executorId: String): Unit = synchronized {
     if (executorIds.contains(executorId)) {
       if (!removeTimes.contains(executorId) && !executorsPendingToRemove.contains(executorId)) {
+        // Note that it is not necessary to query the executors since all the cached
+        // blocks we are concerned with are reported to the driver. Note that this
+        // does not include broadcast blocks.
+        val hasCachedBlocks = SparkEnv.get.blockManager.master.hasCachedBlocks(executorId)
+        val now = clock.getTimeMillis()
+        val timeout = {
+          if (hasCachedBlocks) {
+            // Use a different timeout if the executor has cached blocks.
+            now + cachedExecutorIdleTimeoutS * 1000
+          } else {
+            now + executorIdleTimeoutS * 1000
+          }
+        }
+        val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // overflow
+        removeTimes(executorId) = realTimeout
         logDebug(s"Starting idle timer for $executorId because there are no more tasks " +
-          s"scheduled to run on the executor (to expire in $executorIdleTimeoutS seconds)")
-        removeTimes(executorId) = clock.getTimeMillis + executorIdleTimeoutS * 1000
+          s"scheduled to run on the executor (to expire in ${(realTimeout - now)/1000} seconds)")
       }
     } else {
       logWarning(s"Attempted to mark unknown executor $executorId idle")

http://git-wip-us.apache.org/repos/asf/spark/blob/3285a511/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index abcad94..7cdae22 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -202,6 +202,14 @@ class BlockManagerMaster(
     Await.result(future, timeout)
   }
 
+  /**
+   * Find out if the executor has cached blocks. This method does not consider broadcast blocks,
+   * since they are not reported the master.
+   */
+  def hasCachedBlocks(executorId: String): Boolean = {
+    driverEndpoint.askWithRetry[Boolean](HasCachedBlocks(executorId))
+  }
+
   /** Stop the driver endpoint, called only on the Spark driver node */
   def stop() {
     if (driverEndpoint != null && isDriver) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3285a511/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 2cd8c52..68ed909 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import java.util.{HashMap => JHashMap}
 
+import scala.collection.immutable.HashSet
 import scala.collection.mutable
 import scala.collection.JavaConversions._
 import scala.concurrent.{ExecutionContext, Future}
@@ -112,6 +113,17 @@ class BlockManagerMasterEndpoint(
     case BlockManagerHeartbeat(blockManagerId) =>
       context.reply(heartbeatReceived(blockManagerId))
 
+    case HasCachedBlocks(executorId) =>
+      blockManagerIdByExecutor.get(executorId) match {
+        case Some(bm) =>
+          if (blockManagerInfo.contains(bm)) {
+            val bmInfo = blockManagerInfo(bm)
+            context.reply(bmInfo.cachedBlocks.nonEmpty)
+          } else {
+            context.reply(false)
+          }
+        case None => context.reply(false)
+      }
   }
 
   private def removeRdd(rddId: Int): Future[Seq[Int]] = {
@@ -418,6 +430,9 @@ private[spark] class BlockManagerInfo(
   // Mapping from block id to its status.
   private val _blocks = new JHashMap[BlockId, BlockStatus]
 
+  // Cached blocks held by this BlockManager. This does not include broadcast blocks.
+  private val _cachedBlocks = new mutable.HashSet[BlockId]
+
   def getStatus(blockId: BlockId): Option[BlockStatus] = Option(_blocks.get(blockId))
 
   def updateLastSeenMs() {
@@ -451,27 +466,35 @@ private[spark] class BlockManagerInfo(
        * and the diskSize here indicates the data size in or dropped to disk.
        * They can be both larger than 0, when a block is dropped from memory to disk.
        * Therefore, a safe way to set BlockStatus is to set its info in accurate modes. */
+      var blockStatus: BlockStatus = null
       if (storageLevel.useMemory) {
-        _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0, 0))
+        blockStatus = BlockStatus(storageLevel, memSize, 0, 0)
+        _blocks.put(blockId, blockStatus)
         _remainingMem -= memSize
         logInfo("Added %s in memory on %s (size: %s, free: %s)".format(
           blockId, blockManagerId.hostPort, Utils.bytesToString(memSize),
           Utils.bytesToString(_remainingMem)))
       }
       if (storageLevel.useDisk) {
-        _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize, 0))
+        blockStatus = BlockStatus(storageLevel, 0, diskSize, 0)
+        _blocks.put(blockId, blockStatus)
         logInfo("Added %s on disk on %s (size: %s)".format(
           blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize)))
       }
       if (storageLevel.useOffHeap) {
-        _blocks.put(blockId, BlockStatus(storageLevel, 0, 0, externalBlockStoreSize))
+        blockStatus = BlockStatus(storageLevel, 0, 0, externalBlockStoreSize)
+        _blocks.put(blockId, blockStatus)
         logInfo("Added %s on ExternalBlockStore on %s (size: %s)".format(
           blockId, blockManagerId.hostPort, Utils.bytesToString(externalBlockStoreSize)))
       }
+      if (!blockId.isBroadcast && blockStatus.isCached) {
+        _cachedBlocks += blockId
+      }
     } else if (_blocks.containsKey(blockId)) {
       // If isValid is not true, drop the block.
       val blockStatus: BlockStatus = _blocks.get(blockId)
       _blocks.remove(blockId)
+      _cachedBlocks -= blockId
       if (blockStatus.storageLevel.useMemory) {
         logInfo("Removed %s on %s in memory (size: %s, free: %s)".format(
           blockId, blockManagerId.hostPort, Utils.bytesToString(blockStatus.memSize),
@@ -494,6 +517,7 @@ private[spark] class BlockManagerInfo(
       _remainingMem += _blocks.get(blockId).memSize
       _blocks.remove(blockId)
     }
+    _cachedBlocks -= blockId
   }
 
   def remainingMem: Long = _remainingMem
@@ -502,6 +526,9 @@ private[spark] class BlockManagerInfo(
 
   def blocks: JHashMap[BlockId, BlockStatus] = _blocks
 
+  // This does not include broadcast blocks.
+  def cachedBlocks: collection.Set[BlockId] = _cachedBlocks
+
   override def toString: String = "BlockManagerInfo " + timeMs + " " + _remainingMem
 
   def clear() {

http://git-wip-us.apache.org/repos/asf/spark/blob/3285a511/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 1683576..376e9eb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -42,7 +42,6 @@ private[spark] object BlockManagerMessages {
   case class RemoveBroadcast(broadcastId: Long, removeFromDriver: Boolean = true)
     extends ToBlockManagerSlave
 
-
   //////////////////////////////////////////////////////////////////////////////////
   // Messages from slaves to the master.
   //////////////////////////////////////////////////////////////////////////////////
@@ -108,4 +107,6 @@ private[spark] object BlockManagerMessages {
     extends ToBlockManagerMaster
 
   case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
+
+  case class HasCachedBlocks(executorId: String) extends ToBlockManagerMaster
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3285a511/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 3a48da4..9667ceb 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1202,6 +1202,15 @@ Apart from these, the following properties are also available, and may be useful
   </td>
 </tr>
 <tr>
+  <td><code>spark.dynamicAllocation.cachedExecutorIdleTimeout</code></td>
+  <td>2 * executorIdleTimeout</td>
+  <td>
+    If dynamic allocation is enabled and an executor which has cached data blocks has been idle for more than this duration,
+    the executor will be removed. For more details, see this
+    <a href="job-scheduling.html#resource-allocation-policy">description</a>.
+  </td>
+</tr>
+<tr>
   <td><code>spark.dynamicAllocation.initialExecutors</code></td>
   <td><code>spark.dynamicAllocation.minExecutors</code></td>
   <td>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org