You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ho...@apache.org on 2020/07/17 23:11:56 UTC

[spark] branch master updated: [SPARK-21040][CORE] Speculate tasks which are running on decommission executors

This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0678afe  [SPARK-21040][CORE] Speculate tasks which are running on decommission executors
0678afe is described below

commit 0678afe393b1e4f65b70470483fe0cdb1fe139dc
Author: Prakhar Jain <pr...@gmail.com>
AuthorDate: Fri Jul 17 16:11:02 2020 -0700

    [SPARK-21040][CORE] Speculate tasks which are running on decommission executors
    
    ### What changes were proposed in this pull request?
    This PR adds functionality to consider the running tasks on decommission executors based on some config.
    In spark-on-cloud , we sometimes already know that an executor won't be alive for more than fix amount of time. Ex- In AWS Spot nodes, once we get the notification, we know that a node will be gone in 120 seconds.
    So if the running tasks on the decommissioning executors may run beyond currentTime+120 seconds, then they are candidate for speculation.
    
    ### Why are the changes needed?
    Currently when an executor is decommission, we stop scheduling new tasks on those executors but the already running tasks keeps on running on them. Based on the cloud, we might know beforehand that an executor won't be alive for more than a preconfigured time. Different cloud providers gives different timeouts before they take away the nodes. For Ex- In case of AWS spot nodes, an executor won't be alive for more than 120 seconds. We can utilize this information in cloud environments a [...]
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. This PR adds a new config "spark.executor.decommission.killInterval" which they can explicitly set based on the cloud environment where they are running.
    
    ### How was this patch tested?
    Added UT.
    
    Closes #28619 from prakharjain09/SPARK-21040-speculate-decommission-exec-tasks.
    
    Authored-by: Prakhar Jain <pr...@gmail.com>
    Signed-off-by: Holden Karau <hk...@apple.com>
---
 .../org/apache/spark/internal/config/package.scala |  11 +++
 .../apache/spark/scheduler/TaskSetManager.scala    |  28 +++++-
 .../spark/scheduler/TaskSetManagerSuite.scala      | 106 +++++++++++++++++++++
 3 files changed, 141 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index ca75a19..f0b292b 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1843,6 +1843,17 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createOptional
 
+  private[spark] val EXECUTOR_DECOMMISSION_KILL_INTERVAL =
+    ConfigBuilder("spark.executor.decommission.killInterval")
+      .doc("Duration after which a decommissioned executor will be killed forcefully." +
+        "This config is useful for cloud environments where we know in advance when " +
+        "an executor is going to go down after decommissioning signal i.e. around 2 mins " +
+        "in aws spot nodes, 1/2 hrs in spot block nodes etc. This config is currently " +
+        "used to decide what tasks running on decommission executors to speculate.")
+      .version("3.1.0")
+      .timeConf(TimeUnit.SECONDS)
+      .createOptional
+
   private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
     .doc("Staging directory used while submitting applications.")
     .version("2.0.0")
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a302f68..4b31ff0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.io.NotSerializableException
 import java.nio.ByteBuffer
-import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.{ConcurrentLinkedQueue, TimeUnit}
 
 import scala.collection.immutable.Map
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
@@ -102,6 +102,8 @@ private[spark] class TaskSetManager(
     }
     numTasks <= slots
   }
+  val executorDecommissionKillInterval = conf.get(EXECUTOR_DECOMMISSION_KILL_INTERVAL).map(
+    TimeUnit.SECONDS.toMillis)
 
   // For each task, tracks whether a copy of the task has succeeded. A task will also be
   // marked as "succeeded" if it failed with a fetch failure, in which case it should not
@@ -165,6 +167,7 @@ private[spark] class TaskSetManager(
 
   // Task index, start and finish time for each task attempt (indexed by task ID)
   private[scheduler] val taskInfos = new HashMap[Long, TaskInfo]
+  private[scheduler] val tidToExecutorKillTimeMapping = new HashMap[Long, Long]
 
   // Use a MedianHeap to record durations of successful tasks so we know when to launch
   // speculative tasks. This is only used when speculation is enabled, to avoid the overhead
@@ -933,6 +936,7 @@ private[spark] class TaskSetManager(
 
   /** If the given task ID is in the set of running tasks, removes it. */
   def removeRunningTask(tid: Long): Unit = {
+    tidToExecutorKillTimeMapping.remove(tid)
     if (runningTasksSet.remove(tid) && parent != null) {
       parent.decreaseRunningTasks(1)
     }
@@ -1042,7 +1046,19 @@ private[spark] class TaskSetManager(
       // bound based on that.
       logDebug("Task length threshold for speculation: " + threshold)
       for (tid <- runningTasksSet) {
-        foundTasks |= checkAndSubmitSpeculatableTask(tid, time, threshold)
+        var speculated = checkAndSubmitSpeculatableTask(tid, time, threshold)
+        if (!speculated && tidToExecutorKillTimeMapping.contains(tid)) {
+          // Check whether this task will finish before the exectorKillTime assuming
+          // it will take medianDuration overall. If this task cannot finish within
+          // executorKillInterval, then this task is a candidate for speculation
+          val taskEndTimeBasedOnMedianDuration = taskInfos(tid).launchTime + medianDuration
+          val canExceedDeadline = tidToExecutorKillTimeMapping(tid) <
+            taskEndTimeBasedOnMedianDuration
+          if (canExceedDeadline) {
+            speculated = checkAndSubmitSpeculatableTask(tid, time, 0)
+          }
+        }
+        foundTasks |= speculated
       }
     } else if (speculationTaskDurationThresOpt.isDefined && speculationTasksLessEqToSlots) {
       val time = clock.getTimeMillis()
@@ -1100,8 +1116,12 @@ private[spark] class TaskSetManager(
 
   def executorDecommission(execId: String): Unit = {
     recomputeLocality()
-    // Future consideration: if an executor is decommissioned it may make sense to add the current
-    // tasks to the spec exec queue.
+    executorDecommissionKillInterval.foreach { interval =>
+      val executorKillTime = clock.getTimeMillis() + interval
+      runningTasksSet.filter(taskInfos(_).executorId == execId).foreach { tid =>
+        tidToExecutorKillTimeMapping(tid) = executorKillTime
+      }
+    }
   }
 
   def recomputeLocality(): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 95c8197..ae51b55 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1911,6 +1911,112 @@ class TaskSetManagerSuite
     testSpeculationDurationThreshold(true, 2, 1)
   }
 
+  test("SPARK-21040: Check speculative tasks are launched when an executor is decommissioned" +
+    " and the tasks running on it cannot finish within EXECUTOR_DECOMMISSION_KILL_INTERVAL") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
+    val taskSet = FakeTask.createTaskSet(4)
+    sc.conf.set(config.SPECULATION_ENABLED, true)
+    sc.conf.set(config.SPECULATION_MULTIPLIER, 1.5)
+    sc.conf.set(config.SPECULATION_QUANTILE, 0.5)
+    sc.conf.set(config.EXECUTOR_DECOMMISSION_KILL_INTERVAL.key, "5s")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    // Start TASK 0,1 on exec1, TASK 2 on exec2
+    (0 until 2).foreach { _ =>
+      val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
+      assert(taskOption.isDefined)
+      assert(taskOption.get.executorId === "exec1")
+    }
+    val taskOption2 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption2.isDefined)
+    assert(taskOption2.get.executorId === "exec2")
+
+    clock.advance(6*1000) // time = 6s
+    // Start TASK 3 on exec2 after some delay
+    val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
+    assert(taskOption3.isDefined)
+    assert(taskOption3.get.executorId === "exec2")
+
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+
+    clock.advance(4*1000) // time = 10s
+    // Complete the first 2 tasks and leave the other 2 tasks in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
+    // speculating. Since we use a SPECULATION_MULTIPLIER of 1.5, So tasks need to be running for
+    // > 15s for speculation
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set())
+
+    // decommission exec-2. All tasks running on exec-2 (i.e. TASK 2,3)  will be added to
+    // executorDecommissionSpeculationTriggerTimeoutOpt
+    // (TASK 2 -> 15, TASK 3 -> 15)
+    manager.executorDecommission("exec2")
+    assert(manager.tidToExecutorKillTimeMapping.keySet === Set(2, 3))
+    assert(manager.tidToExecutorKillTimeMapping(2) === 15*1000)
+    assert(manager.tidToExecutorKillTimeMapping(3) === 15*1000)
+
+    assert(manager.checkSpeculatableTasks(0))
+    // TASK 2 started at t=0s, so it can still finish before t=15s (Median task runtime = 10s)
+    // TASK 3 started at t=6s, so it might not finish before t=15s. So TASK 3 should be part
+    // of speculativeTasks
+    assert(sched.speculativeTasks.toSet === Set(3))
+    assert(manager.copiesRunning(3) === 1)
+
+    // Offer resource to start the speculative attempt for the running task
+    val taskOption3New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    // Offer more resources. Nothing should get scheduled now.
+    assert(manager.resourceOffer("exec3", "host3", NO_PREF)._1.isEmpty)
+    assert(taskOption3New.isDefined)
+
+    // Assert info about the newly launched speculative task
+    val speculativeTask3 = taskOption3New.get
+    assert(speculativeTask3.index === 3)
+    assert(speculativeTask3.taskId === 4)
+    assert(speculativeTask3.executorId === "exec3")
+    assert(speculativeTask3.attemptNumber === 1)
+
+    clock.advance(1*1000) // time = 11s
+    // Running checkSpeculatableTasks again should return false
+    assert(!manager.checkSpeculatableTasks(0))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+
+    clock.advance(5*1000) // time = 16s
+    // At t=16s, TASK 2 has been running for 16s. It is more than the
+    // SPECULATION_MULTIPLIER * medianRuntime = 1.5 * 10 = 15s. So now TASK 2 will
+    // be selected for speculation. Here we are verifying that regular speculation configs
+    // should still take effect even when a EXECUTOR_DECOMMISSION_KILL_INTERVAL is provided and
+    // corresponding executor is decommissioned
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(2, 3))
+    assert(manager.copiesRunning(2) === 1)
+    assert(manager.copiesRunning(3) === 2)
+    val taskOption2New = manager.resourceOffer("exec3", "host3", NO_PREF)._1
+    assert(taskOption2New.isDefined)
+    val speculativeTask2 = taskOption2New.get
+    // Ensure that TASK 2 is re-launched on exec3, host3
+    assert(speculativeTask2.index === 2)
+    assert(speculativeTask2.taskId === 5)
+    assert(speculativeTask2.executorId === "exec3")
+    assert(speculativeTask2.attemptNumber === 1)
+
+    assert(manager.copiesRunning(2) === 2)
+    assert(manager.copiesRunning(3) === 2)
+
+    // Offering additional resources should not lead to any speculative tasks being respawned
+    assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
+  }
+
   test("SPARK-29976 Regular speculation configs should still take effect even when a " +
       "threshold is provided") {
     val (manager, clock) = testSpeculationDurationSetup(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org