You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2018/07/19 14:52:13 UTC

spark git commit: [SPARK-24755][CORE] Executor loss can cause task to not be resubmitted

Repository: spark
Updated Branches:
  refs/heads/master 6a9a058e0 -> 8d707b060


[SPARK-24755][CORE] Executor loss can cause task to not be resubmitted

**Description**
As described in [SPARK-24755](https://issues.apache.org/jira/browse/SPARK-24755), when speculation is enabled, there is scenario that executor loss can cause task to not be resubmitted.
This patch changes the variable killedByOtherAttempt to keeps track of the taskId of tasks that are killed by other attempt. By doing this, we can still prevent resubmitting task killed by other attempt while resubmit successful attempt when executor lost.

**How was this patch tested?**
A UT is added based on the UT written by xuanyuanking with modification to simulate the scenario described in SPARK-24755.

Author: Hieu Huynh <“Hieu.huynh@oath.com”>

Closes #21729 from hthuynh2/SPARK_24755.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8d707b06
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8d707b06
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8d707b06

Branch: refs/heads/master
Commit: 8d707b06003bc97d06630b22e6ae7c35f99b3cdd
Parents: 6a9a058
Author: Hieu Huynh <“Hieu.huynh@oath.com”>
Authored: Thu Jul 19 09:52:07 2018 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Thu Jul 19 09:52:07 2018 -0500

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSetManager.scala |  10 +-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 112 +++++++++++++++++++
 2 files changed, 117 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8d707b06/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 6071605..defed1e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -84,10 +84,10 @@ private[spark] class TaskSetManager(
   val successful = new Array[Boolean](numTasks)
   private val numFailures = new Array[Int](numTasks)
 
-  // Set the coresponding index of Boolean var when the task killed by other attempt tasks,
-  // this happened while we set the `spark.speculation` to true. The task killed by others
+  // Add the tid of task into this HashSet when the task is killed by other attempt tasks.
+  // This happened while we set the `spark.speculation` to true. The task killed by others
   // should not resubmit while executor lost.
-  private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks)
+  private val killedByOtherAttempt = new HashSet[Long]
 
   val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
   private[scheduler] var tasksSuccessful = 0
@@ -735,7 +735,7 @@ private[spark] class TaskSetManager(
       logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
         s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
         s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
-      killedByOtherAttempt(index) = true
+      killedByOtherAttempt += attemptInfo.taskId
       sched.backend.killTask(
         attemptInfo.taskId,
         attemptInfo.executorId,
@@ -947,7 +947,7 @@ private[spark] class TaskSetManager(
         && !isZombie) {
       for ((tid, info) <- taskInfos if info.executorId == execId) {
         val index = taskInfos(tid).index
-        if (successful(index) && !killedByOtherAttempt(index)) {
+        if (successful(index) && !killedByOtherAttempt.contains(tid)) {
           successful(index) = false
           copiesRunning(index) -= 1
           tasksSuccessful -= 1

http://git-wip-us.apache.org/repos/asf/spark/blob/8d707b06/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ae571e5..206b9f4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1414,6 +1414,118 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     taskSetManager2.checkSpeculatableTasks(0)
   }
 
+
+  test("SPARK-24755 Executor loss can cause task to not be resubmitted") {
+    val conf = new SparkConf().set("spark.speculation", "true")
+    sc = new SparkContext("local", "test", conf)
+    // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
+    sc.conf.set("spark.speculation.multiplier", "0.0")
+
+    sc.conf.set("spark.speculation.quantile", "0.5")
+    sc.conf.set("spark.speculation", "true")
+
+    var killTaskCalled = false
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+      ("exec2", "host2"), ("exec3", "host3"))
+    sched.initialize(new FakeSchedulerBackend() {
+      override def killTask(
+          taskId: Long,
+          executorId: String,
+          interruptThread: Boolean,
+          reason: String): Unit = {
+        // Check the only one killTask event in this case, which triggered by
+        // task 2.1 completed.
+        assert(taskId === 2)
+        assert(executorId === "exec3")
+        assert(interruptThread)
+        assert(reason === "another attempt succeeded")
+        killTaskCalled = true
+      }
+    })
+
+    // Keep track of the index of tasks that are resubmitted,
+    // so that the test can check that task is resubmitted correctly
+    var resubmittedTasks = new mutable.HashSet[Int]
+    val dagScheduler = new FakeDAGScheduler(sc, sched) {
+      override def taskEnded(
+          task: Task[_],
+          reason: TaskEndReason,
+          result: Any,
+          accumUpdates: Seq[AccumulatorV2[_, _]],
+          taskInfo: TaskInfo): Unit = {
+        super.taskEnded(task, reason, result, accumUpdates, taskInfo)
+        reason match {
+          case Resubmitted => resubmittedTasks += taskInfo.index
+          case _ =>
+        }
+      }
+    }
+    sched.dagScheduler.stop()
+    sched.setDAGScheduler(dagScheduler)
+
+    val taskSet = FakeTask.createShuffleMapTaskSet(4, 0, 0,
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host3", "exec3")),
+      Seq(TaskLocation("host2", "exec2")))
+
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+    // Offer resources for 4 tasks to start
+    for ((exec, host) <- Seq(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec3" -> "host3",
+        "exec2" -> "host2")) {
+      val taskOption = manager.resourceOffer(exec, host, NO_PREF)
+      assert(taskOption.isDefined)
+      val task = taskOption.get
+      assert(task.executorId === exec)
+      // Add an extra assert to make sure task 2.0 is running on exec3
+      if (task.index == 2) {
+        assert(task.attemptNumber === 0)
+        assert(task.executorId === "exec3")
+      }
+    }
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+    clock.advance(1)
+    // Complete the 2 tasks and leave 2 task in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
+    // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
+    // > 0ms, so advance the clock by 1ms here.
+    clock.advance(1)
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(2, 3))
+
+    // Offer resource to start the speculative attempt for the running task 2.0
+    val taskOption = manager.resourceOffer("exec2", "host2", ANY)
+    assert(taskOption.isDefined)
+    val task4 = taskOption.get
+    assert(task4.index === 2)
+    assert(task4.taskId === 4)
+    assert(task4.executorId === "exec2")
+    assert(task4.attemptNumber === 1)
+    // Complete the speculative attempt for the running task
+    manager.handleSuccessfulTask(4, createTaskResult(2, accumUpdatesByTask(2)))
+    // Make sure schedBackend.killTask(2, "exec3", true, "another attempt succeeded") gets called
+    assert(killTaskCalled)
+
+    assert(resubmittedTasks.isEmpty)
+    // Host 2 Losts, meaning we lost the map output task4
+    manager.executorLost("exec2", "host2", SlaveLost())
+    // Make sure that task with index 2 is re-submitted
+    assert(resubmittedTasks.contains(2))
+
+  }
+
   private def createTaskResult(
       id: Int,
       accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org