You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/07/16 23:38:00 UTC

[spark] branch master updated: [SPARK-27963][CORE] Allow dynamic allocation without a shuffle service.

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ddeff9  [SPARK-27963][CORE] Allow dynamic allocation without a shuffle service.
2ddeff9 is described below

commit 2ddeff97d7329942a98ef363991eeabc3fa71a76
Author: Marcelo Vanzin <va...@cloudera.com>
AuthorDate: Tue Jul 16 16:37:38 2019 -0700

    [SPARK-27963][CORE] Allow dynamic allocation without a shuffle service.
    
    This change adds a new option that enables dynamic allocation without
    the need for a shuffle service. This mode works by tracking which stages
    generate shuffle files, and keeping executors that generate data for those
    shuffles alive while the jobs that use them are active.
    
    A separate timeout is also added for shuffle data; so that executors that
    hold shuffle data can use a separate timeout before being removed because
    of being idle. This allows the shuffle data to be kept around in case it
    is needed by some new job, or allow users to be more aggressive in timing
    out executors that don't have shuffle data in active use.
    
    The code also hooks up to the context cleaner so that shuffles that are
    garbage collected are detected, and the respective executors not held
    unnecessarily.
    
    Testing done with added unit tests, and also with TPC-DS workloads on
    YARN without a shuffle service.
    
    Closes #24817 from vanzin/SPARK-27963.
    
    Authored-by: Marcelo Vanzin <va...@cloudera.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../apache/spark/ExecutorAllocationManager.scala   |  16 +-
 .../main/scala/org/apache/spark/SparkContext.scala |  20 +-
 .../org/apache/spark/internal/config/package.scala |  11 +
 .../org/apache/spark/scheduler/StageInfo.scala     |  10 +-
 .../spark/scheduler/dynalloc/ExecutorMonitor.scala | 225 ++++++++++++++++++++-
 .../spark/ExecutorAllocationManagerSuite.scala     |   2 +-
 .../scheduler/dynalloc/ExecutorMonitorSuite.scala  | 132 +++++++++++-
 docs/configuration.md                              |  20 ++
 8 files changed, 407 insertions(+), 29 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index bceb26c..5114cf7 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -94,6 +94,7 @@ private[spark] class ExecutorAllocationManager(
     client: ExecutorAllocationClient,
     listenerBus: LiveListenerBus,
     conf: SparkConf,
+    cleaner: Option[ContextCleaner] = None,
     clock: Clock = new SystemClock())
   extends Logging {
 
@@ -148,7 +149,7 @@ private[spark] class ExecutorAllocationManager(
   // Listener for Spark events that impact the allocation policy
   val listener = new ExecutorAllocationListener
 
-  val executorMonitor = new ExecutorMonitor(conf, client, clock)
+  val executorMonitor = new ExecutorMonitor(conf, client, listenerBus, clock)
 
   // Executor that handles the scheduling task.
   private val executor =
@@ -194,11 +195,13 @@ private[spark] class ExecutorAllocationManager(
       throw new SparkException(
         s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
     }
-    // Require external shuffle service for dynamic allocation
-    // Otherwise, we may lose shuffle files when killing executors
-    if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
-      throw new SparkException("Dynamic allocation of executors requires the external " +
-        "shuffle service. You may enable this through spark.shuffle.service.enabled.")
+    if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
+      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING)) {
+        logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
+      } else if (!testing) {
+        throw new SparkException("Dynamic allocation of executors requires the external " +
+          "shuffle service. You may enable this through spark.shuffle.service.enabled.")
+      }
     }
 
     if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
@@ -214,6 +217,7 @@ private[spark] class ExecutorAllocationManager(
   def start(): Unit = {
     listenerBus.addToManagementQueue(listener)
     listenerBus.addToManagementQueue(executorMonitor)
+    cleaner.foreach(_.attachListener(executorMonitor))
 
     val scheduleTask = new Runnable() {
       override def run(): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f289c17..75182b0 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -553,14 +553,22 @@ class SparkContext(config: SparkConf) extends Logging {
         None
       }
 
-    // Optionally scale number of executors dynamically based on workload. Exposed for testing.
+    _cleaner =
+      if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
+        Some(new ContextCleaner(this))
+      } else {
+        None
+      }
+    _cleaner.foreach(_.start())
+
     val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
     _executorAllocationManager =
       if (dynamicAllocationEnabled) {
         schedulerBackend match {
           case b: ExecutorAllocationClient =>
             Some(new ExecutorAllocationManager(
-              schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
+              schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
+              cleaner = cleaner))
           case _ =>
             None
         }
@@ -569,14 +577,6 @@ class SparkContext(config: SparkConf) extends Logging {
       }
     _executorAllocationManager.foreach(_.start())
 
-    _cleaner =
-      if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
-        Some(new ContextCleaner(this))
-      } else {
-        None
-      }
-    _cleaner.foreach(_.start())
-
     setupAndStartListenerBus()
     postEnvironmentUpdate()
     postApplicationStart()
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 76d3d6e..f2b88fe 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -369,6 +369,17 @@ package object config {
       .checkValue(_ >= 0L, "Timeout must be >= 0.")
       .createWithDefault(60)
 
+  private[spark] val DYN_ALLOCATION_SHUFFLE_TRACKING =
+    ConfigBuilder("spark.dynamicAllocation.shuffleTracking.enabled")
+      .booleanConf
+      .createWithDefault(false)
+
+  private[spark] val DYN_ALLOCATION_SHUFFLE_TIMEOUT =
+    ConfigBuilder("spark.dynamicAllocation.shuffleTimeout")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .checkValue(_ >= 0L, "Timeout must be >= 0.")
+      .createWithDefault(Long.MaxValue)
+
   private[spark] val DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT =
     ConfigBuilder("spark.dynamicAllocation.schedulerBacklogTimeout")
       .timeConf(TimeUnit.SECONDS).createWithDefault(1)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 33a68f2..e321615 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -37,7 +37,8 @@ class StageInfo(
     val parentIds: Seq[Int],
     val details: String,
     val taskMetrics: TaskMetrics = null,
-    private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty) {
+    private[spark] val taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty,
+    private[spark] val shuffleDepId: Option[Int] = None) {
   /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
   var submissionTime: Option[Long] = None
   /** Time when all tasks in the stage completed or when the stage was cancelled. */
@@ -90,6 +91,10 @@ private[spark] object StageInfo {
     ): StageInfo = {
     val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
     val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
+    val shuffleDepId = stage match {
+      case sms: ShuffleMapStage => Option(sms.shuffleDep).map(_.shuffleId)
+      case _ => None
+    }
     new StageInfo(
       stage.id,
       attemptId,
@@ -99,6 +104,7 @@ private[spark] object StageInfo {
       stage.parents.map(_.id),
       stage.details,
       taskMetrics,
-      taskLocalityPreferences)
+      taskLocalityPreferences,
+      shuffleDepId)
   }
 }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index 9aac4d2..f5beb40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -36,14 +36,19 @@ import org.apache.spark.util.Clock
 private[spark] class ExecutorMonitor(
     conf: SparkConf,
     client: ExecutorAllocationClient,
-    clock: Clock) extends SparkListener with Logging {
+    listenerBus: LiveListenerBus,
+    clock: Clock) extends SparkListener with CleanerListener with Logging {
 
   private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(
     conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
   private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(
     conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT))
+  private val shuffleTimeoutMs = conf.get(DYN_ALLOCATION_SHUFFLE_TIMEOUT)
+
   private val fetchFromShuffleSvcEnabled = conf.get(SHUFFLE_SERVICE_ENABLED) &&
     conf.get(SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
+  private val shuffleTrackingEnabled = !conf.get(SHUFFLE_SERVICE_ENABLED) &&
+    conf.get(DYN_ALLOCATION_SHUFFLE_TRACKING)
 
   private val executors = new ConcurrentHashMap[String, Tracker]()
 
@@ -64,6 +69,26 @@ private[spark] class ExecutorMonitor(
   private val nextTimeout = new AtomicLong(Long.MaxValue)
   private var timedOutExecs = Seq.empty[String]
 
+  // Active job tracking.
+  //
+  // The following state is used when an external shuffle service is not in use, and allows Spark
+  // to scale down based on whether the shuffle data stored in executors is in use.
+  //
+  // The algorithm works as following: when jobs start, some state is kept that tracks which stages
+  // are part of that job, and which shuffle ID is attached to those stages. As tasks finish, the
+  // executor tracking code is updated to include the list of shuffles for which it's storing
+  // shuffle data.
+  //
+  // If executors hold shuffle data that is related to an active job, then the executor is
+  // considered to be in "shuffle busy" state; meaning that the executor is not allowed to be
+  // removed. If the executor has shuffle data but it doesn't relate to any active job, then it
+  // may be removed when idle, following the shuffle-specific timeout configuration.
+  //
+  // The following fields are not thread-safe and should be only used from the event thread.
+  private val shuffleToActiveJobs = new mutable.HashMap[Int, mutable.ArrayBuffer[Int]]()
+  private val stageToShuffleID = new mutable.HashMap[Int, Int]()
+  private val jobToStageIDs = new mutable.HashMap[Int, Seq[Int]]()
+
   def reset(): Unit = {
     executors.clear()
     nextTimeout.set(Long.MaxValue)
@@ -85,7 +110,7 @@ private[spark] class ExecutorMonitor(
 
       var newNextTimeout = Long.MaxValue
       timedOutExecs = executors.asScala
-        .filter { case (_, exec) => !exec.pendingRemoval }
+        .filter { case (_, exec) => !exec.pendingRemoval && !exec.hasActiveShuffle }
         .filter { case (_, exec) =>
           val deadline = exec.timeoutAt
           if (deadline > now) {
@@ -124,6 +149,109 @@ private[spark] class ExecutorMonitor(
 
   def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) => exec.pendingRemoval }
 
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+    if (!shuffleTrackingEnabled) {
+      return
+    }
+
+    val shuffleStages = event.stageInfos.flatMap { s =>
+      s.shuffleDepId.toSeq.map { shuffleId =>
+        s.stageId -> shuffleId
+      }
+    }
+
+    var updateExecutors = false
+    shuffleStages.foreach { case (stageId, shuffle) =>
+      val jobIDs = shuffleToActiveJobs.get(shuffle) match {
+        case Some(jobs) =>
+          // If a shuffle is being re-used, we need to re-scan the executors and update their
+          // tracker with the information that the shuffle data they're storing is in use.
+          logDebug(s"Reusing shuffle $shuffle in job ${event.jobId}.")
+          updateExecutors = true
+          jobs
+
+        case _ =>
+          logDebug(s"Registered new shuffle $shuffle (from stage $stageId).")
+          val jobs = new mutable.ArrayBuffer[Int]()
+          shuffleToActiveJobs(shuffle) = jobs
+          jobs
+      }
+      jobIDs += event.jobId
+    }
+
+    if (updateExecutors) {
+      val activeShuffleIds = shuffleStages.map(_._2).toSeq
+      var needTimeoutUpdate = false
+      val activatedExecs = new mutable.ArrayBuffer[String]()
+      executors.asScala.foreach { case (id, exec) =>
+        if (!exec.hasActiveShuffle) {
+          exec.updateActiveShuffles(activeShuffleIds)
+          if (exec.hasActiveShuffle) {
+            needTimeoutUpdate = true
+            activatedExecs += id
+          }
+        }
+      }
+
+      logDebug(s"Activated executors ${activatedExecs.mkString(",")} due to shuffle data " +
+        s"needed by new job ${event.jobId}.")
+
+      if (needTimeoutUpdate) {
+        nextTimeout.set(Long.MinValue)
+      }
+    }
+
+    stageToShuffleID ++= shuffleStages
+    jobToStageIDs(event.jobId) = shuffleStages.map(_._1).toSeq
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+    if (!shuffleTrackingEnabled) {
+      return
+    }
+
+    var updateExecutors = false
+    val activeShuffles = new mutable.ArrayBuffer[Int]()
+    shuffleToActiveJobs.foreach { case (shuffleId, jobs) =>
+      jobs -= event.jobId
+      if (jobs.nonEmpty) {
+        activeShuffles += shuffleId
+      } else {
+        // If a shuffle went idle we need to update all executors to make sure they're correctly
+        // tracking active shuffles.
+        updateExecutors = true
+      }
+    }
+
+    if (updateExecutors) {
+      if (log.isDebugEnabled()) {
+        if (activeShuffles.nonEmpty) {
+          logDebug(
+            s"Job ${event.jobId} ended, shuffles ${activeShuffles.mkString(",")} still active.")
+        } else {
+          logDebug(s"Job ${event.jobId} ended, no active shuffles remain.")
+        }
+      }
+
+      val deactivatedExecs = new mutable.ArrayBuffer[String]()
+      executors.asScala.foreach { case (id, exec) =>
+        if (exec.hasActiveShuffle) {
+          exec.updateActiveShuffles(activeShuffles)
+          if (!exec.hasActiveShuffle) {
+            deactivatedExecs += id
+          }
+        }
+      }
+
+      logDebug(s"Executors ${deactivatedExecs.mkString(",")} do not have active shuffle data " +
+        s"after job ${event.jobId} finished.")
+    }
+
+    jobToStageIDs.remove(event.jobId).foreach { stages =>
+      stages.foreach { id => stageToShuffleID -= id }
+    }
+  }
+
   override def onTaskStart(event: SparkListenerTaskStart): Unit = {
     val executorId = event.taskInfo.executorId
     // Guard against a late arriving task start event (SPARK-26927).
@@ -137,6 +265,21 @@ private[spark] class ExecutorMonitor(
     val executorId = event.taskInfo.executorId
     val exec = executors.get(executorId)
     if (exec != null) {
+      // If the task succeeded and the stage generates shuffle data, record that this executor
+      // holds data for the shuffle. This code will track all executors that generate shuffle
+      // for the stage, even if speculative tasks generate duplicate shuffle data and end up
+      // being ignored by the map output tracker.
+      //
+      // This means that an executor may be marked as having shuffle data, and thus prevented
+      // from being removed, even though the data may not be used.
+      if (shuffleTrackingEnabled && event.reason == Success) {
+        stageToShuffleID.get(event.stageId).foreach { shuffleId =>
+          exec.addShuffle(shuffleId)
+        }
+      }
+
+      // Update the number of running tasks after checking for shuffle data, so that the shuffle
+      // information is up-to-date in case the executor is going idle.
       exec.updateRunningTasks(-1)
     }
   }
@@ -171,7 +314,6 @@ private[spark] class ExecutorMonitor(
     // available. So don't count blocks that can be served by the external service.
     if (storageLevel.isValid && (!fetchFromShuffleSvcEnabled || !storageLevel.useDisk)) {
       val hadCachedBlocks = exec.cachedBlocks.nonEmpty
-
       val blocks = exec.cachedBlocks.getOrElseUpdate(blockId.rddId,
         new mutable.BitSet(blockId.splitIndex))
       blocks += blockId.splitIndex
@@ -201,6 +343,25 @@ private[spark] class ExecutorMonitor(
     }
   }
 
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+    case ShuffleCleanedEvent(id) => cleanupShuffle(id)
+    case _ =>
+  }
+
+  override def rddCleaned(rddId: Int): Unit = { }
+
+  override def shuffleCleaned(shuffleId: Int): Unit = {
+    // Because this is called in a completely separate thread, we post a custom event to the
+    // listener bus so that the internal state is safely updated.
+    listenerBus.post(ShuffleCleanedEvent(shuffleId))
+  }
+
+  override def broadcastCleaned(broadcastId: Long): Unit = { }
+
+  override def accumCleaned(accId: Long): Unit = { }
+
+  override def checkpointCleaned(rddId: Long): Unit = { }
+
   // Visible for testing.
   private[dynalloc] def isExecutorIdle(id: String): Boolean = {
     Option(executors.get(id)).map(_.isIdle).getOrElse(throw new NoSuchElementException(id))
@@ -209,7 +370,7 @@ private[spark] class ExecutorMonitor(
   // Visible for testing
   private[dynalloc] def timedOutExecutors(when: Long): Seq[String] = {
     executors.asScala.flatMap { case (id, tracker) =>
-      if (tracker.timeoutAt <= when) Some(id) else None
+      if (tracker.isIdle && tracker.timeoutAt <= when) Some(id) else None
     }.toSeq
   }
 
@@ -236,6 +397,14 @@ private[spark] class ExecutorMonitor(
     }
   }
 
+  private def cleanupShuffle(id: Int): Unit = {
+    logDebug(s"Cleaning up state related to shuffle $id.")
+    shuffleToActiveJobs -= id
+    executors.asScala.foreach { case (_, exec) =>
+      exec.removeShuffle(id)
+    }
+  }
+
   private class Tracker {
     @volatile var timeoutAt: Long = Long.MaxValue
 
@@ -244,6 +413,7 @@ private[spark] class ExecutorMonitor(
     @volatile var timedOut: Boolean = false
 
     var pendingRemoval: Boolean = false
+    var hasActiveShuffle: Boolean = false
 
     private var idleStart: Long = -1
     private var runningTasks: Int = 0
@@ -252,8 +422,11 @@ private[spark] class ExecutorMonitor(
     // This should only be used in the event thread.
     val cachedBlocks = new mutable.HashMap[Int, mutable.BitSet]()
 
-    // For testing.
-    def isIdle: Boolean = idleStart >= 0
+    // The set of shuffles for which shuffle data is held by the executor.
+    // This should only be used in the event thread.
+    private val shuffleIds = if (shuffleTrackingEnabled) new mutable.HashSet[Int]() else null
+
+    def isIdle: Boolean = idleStart >= 0 && !hasActiveShuffle
 
     def updateRunningTasks(delta: Int): Unit = {
       runningTasks = math.max(0, runningTasks + delta)
@@ -264,7 +437,18 @@ private[spark] class ExecutorMonitor(
     def updateTimeout(): Unit = {
       val oldDeadline = timeoutAt
       val newDeadline = if (idleStart >= 0) {
-        idleStart + (if (cachedBlocks.nonEmpty) storageTimeoutMs else idleTimeoutMs)
+        val timeout = if (cachedBlocks.nonEmpty || (shuffleIds != null && shuffleIds.nonEmpty)) {
+          val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutMs else Long.MaxValue
+          val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) {
+            shuffleTimeoutMs
+          } else {
+            Long.MaxValue
+          }
+          math.min(_cacheTimeout, _shuffleTimeout)
+        } else {
+          idleTimeoutMs
+        }
+        idleStart + timeout
       } else {
         Long.MaxValue
       }
@@ -279,5 +463,32 @@ private[spark] class ExecutorMonitor(
         updateNextTimeout(newDeadline)
       }
     }
+
+    def addShuffle(id: Int): Unit = {
+      if (shuffleIds.add(id)) {
+        hasActiveShuffle = true
+      }
+    }
+
+    def removeShuffle(id: Int): Unit = {
+      if (shuffleIds.remove(id) && shuffleIds.isEmpty) {
+        hasActiveShuffle = false
+        if (isIdle) {
+          updateTimeout()
+        }
+      }
+    }
+
+    def updateActiveShuffles(ids: Iterable[Int]): Unit = {
+      val hadActiveShuffle = hasActiveShuffle
+      hasActiveShuffle = ids.exists(shuffleIds.contains)
+      if (hadActiveShuffle && isIdle) {
+        updateTimeout()
+      }
+    }
+  }
+
+  private case class ShuffleCleanedEvent(id: Int) extends SparkListenerEvent {
+    override protected[spark] def logEvent: Boolean = false
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 3ba33e3..191b516 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -1008,7 +1008,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
   private def createManager(
       conf: SparkConf,
       clock: Clock = new SystemClock()): ExecutorAllocationManager = {
-    val manager = new ExecutorAllocationManager(client, listenerBus, conf, clock)
+    val manager = new ExecutorAllocationManager(client, listenerBus, conf, clock = clock)
     managers += manager
     manager.start()
     manager
diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
index 8d1577e..e11ee97 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
 import scala.collection.mutable
 
 import org.mockito.ArgumentMatchers.any
-import org.mockito.Mockito.{mock, when}
+import org.mockito.Mockito.{doAnswer, mock, when}
 
 import org.apache.spark._
 import org.apache.spark.internal.config._
@@ -34,10 +34,13 @@ class ExecutorMonitorSuite extends SparkFunSuite {
 
   private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(60L)
   private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(120L)
+  private val shuffleTimeoutMs = TimeUnit.SECONDS.toMillis(240L)
 
   private val conf = new SparkConf()
     .set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "60s")
     .set(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key, "120s")
+    .set(DYN_ALLOCATION_SHUFFLE_TIMEOUT.key, "240s")
+    .set(SHUFFLE_SERVICE_ENABLED, true)
 
   private var monitor: ExecutorMonitor = _
   private var client: ExecutorAllocationClient = _
@@ -55,7 +58,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     when(client.isExecutorActive(any())).thenAnswer { invocation =>
       knownExecs.contains(invocation.getArguments()(0).asInstanceOf[String])
     }
-    monitor = new ExecutorMonitor(conf, client, clock)
+    monitor = new ExecutorMonitor(conf, client, null, clock)
   }
 
   test("basic executor timeout") {
@@ -205,7 +208,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     assert(monitor.timedOutExecutors(storageDeadline) ===  Seq("1"))
 
     conf.set(SHUFFLE_SERVICE_ENABLED, true).set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
-    monitor = new ExecutorMonitor(conf, client, clock)
+    monitor = new ExecutorMonitor(conf, client, null, clock)
 
     monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
     monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.MEMORY_ONLY))
@@ -259,8 +262,119 @@ class ExecutorMonitorSuite extends SparkFunSuite {
     assert(monitor.timedOutExecutors().toSet === Set("2"))
   }
 
+  test("shuffle block tracking") {
+    val bus = mockListenerBus()
+    conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, false)
+    monitor = new ExecutorMonitor(conf, client, bus, clock)
+
+    // 3 jobs: 2 and 3 share a shuffle, 1 has a separate shuffle.
+    val stage1 = stageInfo(1, shuffleId = 0)
+    val stage2 = stageInfo(2)
+
+    val stage3 = stageInfo(3, shuffleId = 1)
+    val stage4 = stageInfo(4)
+
+    val stage5 = stageInfo(5, shuffleId = 1)
+    val stage6 = stageInfo(6)
+
+    // Start jobs 1 and 2. Finish a task on each, but don't finish the jobs. This should prevent the
+    // executor from going idle since there are active shuffles.
+    monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage1, stage2)))
+    monitor.onJobStart(SparkListenerJobStart(2, clock.getTimeMillis(), Seq(stage3, stage4)))
+
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+
+    // First a failed task, to make sure it does not count.
+    monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
+    monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", TaskResultLost, taskInfo("1", 1), null))
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+
+    monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
+    monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1), null))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+
+    monitor.onTaskStart(SparkListenerTaskStart(3, 0, taskInfo("1", 1)))
+    monitor.onTaskEnd(SparkListenerTaskEnd(3, 0, "foo", Success, taskInfo("1", 1), null))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+
+    // Finish the jobs, now the executor should be idle, but with the shuffle timeout, since the
+    // shuffles are not active.
+    monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded))
+    assert(!monitor.isExecutorIdle("1"))
+
+    monitor.onJobEnd(SparkListenerJobEnd(2, clock.getTimeMillis(), JobSucceeded))
+    assert(monitor.isExecutorIdle("1"))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(storageDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1"))
+
+    // Start job 3. Since it shares a shuffle with job 2, the executor should not be considered
+    // idle anymore, even if no tasks are run.
+    monitor.onJobStart(SparkListenerJobStart(3, clock.getTimeMillis(), Seq(stage5, stage6)))
+    assert(!monitor.isExecutorIdle("1"))
+    assert(monitor.timedOutExecutors(shuffleDeadline).isEmpty)
+
+    monitor.onJobEnd(SparkListenerJobEnd(3, clock.getTimeMillis(), JobSucceeded))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(shuffleDeadline) === Seq("1"))
+
+    // Clean up the shuffles, executor now should now time out at the idle deadline.
+    monitor.shuffleCleaned(0)
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    monitor.shuffleCleaned(1)
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+  }
+
+  test("shuffle tracking with multiple executors and concurrent jobs") {
+    val bus = mockListenerBus()
+    conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, false)
+    monitor = new ExecutorMonitor(conf, client, bus, clock)
+
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "2", null))
+
+    // Two separate jobs with separate shuffles. The first job will only run tasks on
+    // executor 1, the second on executor 2. Ensures that jobs finishing don't affect
+    // executors that are active in other jobs.
+
+    val stage1 = stageInfo(1, shuffleId = 0)
+    val stage2 = stageInfo(2)
+    monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage1, stage2)))
+
+    val stage3 = stageInfo(3, shuffleId = 1)
+    val stage4 = stageInfo(4)
+    monitor.onJobStart(SparkListenerJobStart(2, clock.getTimeMillis(), Seq(stage3, stage4)))
+
+    monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
+    monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1), null))
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("2"))
+
+    monitor.onTaskStart(SparkListenerTaskStart(3, 0, taskInfo("2", 1)))
+    monitor.onTaskEnd(SparkListenerTaskEnd(3, 0, "foo", Success, taskInfo("2", 1), null))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+
+    monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded))
+    assert(monitor.isExecutorIdle("1"))
+    assert(!monitor.isExecutorIdle("2"))
+
+    monitor.onJobEnd(SparkListenerJobEnd(2, clock.getTimeMillis(), JobSucceeded))
+    assert(monitor.isExecutorIdle("2"))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+
+    monitor.shuffleCleaned(0)
+    monitor.shuffleCleaned(1)
+    assert(monitor.timedOutExecutors(idleDeadline).toSet === Set("1", "2"))
+  }
+
   private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1
   private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1
+  private def shuffleDeadline: Long = clock.getTimeMillis() + shuffleTimeoutMs + 1
+
+  private def stageInfo(id: Int, shuffleId: Int = -1): StageInfo = {
+    new StageInfo(id, 0, s"stage$id", 1, Nil, Nil, "",
+      shuffleDepId = if (shuffleId >= 0) Some(shuffleId) else None)
+  }
 
   private def taskInfo(
       execId: String,
@@ -286,4 +400,16 @@ class ExecutorMonitorSuite extends SparkFunSuite {
         RDDBlockId(rddId, splitIndex), level, 1L, 0L))
   }
 
+  /**
+   * Mock the listener bus *only* for the functionality needed by the shuffle tracking code.
+   * Any other event sent through the mock bus will fail.
+   */
+  private def mockListenerBus(): LiveListenerBus = {
+    val bus = mock(classOf[LiveListenerBus])
+    doAnswer { invocation =>
+      monitor.onOtherEvent(invocation.getArguments()(0).asInstanceOf[SparkListenerEvent])
+    }.when(bus).post(any())
+    bus
+  }
+
 }
diff --git a/docs/configuration.md b/docs/configuration.md
index 06b0408..10886241 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -2114,6 +2114,26 @@ Apart from these, the following properties are also available, and may be useful
     <a href="job-scheduling.html#resource-allocation-policy">description</a>.
   </td>
 </tr>
+<tr>
+  <td><code>spark.dynamicAllocation.shuffleTracking.enabled</code></td>
+  <td><code>false</code></td>
+  <td>
+    Experimental. Enables shuffle file tracking for executors, which allows dynamic allocation
+    without the need for an external shuffle service. This option will try to keep alive executors
+    that are storing shuffle data for active jobs.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.dynamicAllocation.shuffleTimeout</code></td>
+  <td><code>infinity</code></td>
+  <td>
+    When shuffle tracking is enabled, controls the timeout for executors that are holding shuffle
+    data. The default value means that Spark will rely on the shuffles being garbage collected to be
+    able to release executors. If for some reason garbage collection is not cleaning up shuffles
+    quickly enough, this option can be used to control when to time out executors even when they are
+    storing shuffle data.
+  </td>
+</tr>
 </table>
 
 ### Thread Configurations


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org