You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by js...@apache.org on 2017/10/09 06:17:12 UTC

spark git commit: [SPARK-22074][CORE] Task killed by other attempt task should not be resubmitted

Repository: spark
Updated Branches:
  refs/heads/master c998a2ae0 -> fe7b219ae


[SPARK-22074][CORE] Task killed by other attempt task should not be resubmitted

## What changes were proposed in this pull request?

As the detail scenario described in [SPARK-22074](https://issues.apache.org/jira/browse/SPARK-22074), unnecessary resubmitted may cause stage hanging in currently release versions. This patch add a new var in TaskInfo to mark this task killed by other attempt or not.

## How was this patch tested?

Add a new UT `[SPARK-22074] Task killed by other attempt task should not be resubmitted` in TaskSetManagerSuite, this UT recreate the scenario in JIRA description, it failed without the changes in this PR and passed conversely.

Author: Yuanjian Li <xy...@gmail.com>

Closes #19287 from xuanyuanking/SPARK-22074.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe7b219a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe7b219a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe7b219a

Branch: refs/heads/master
Commit: fe7b219ae3e8a045655a836cbb77219036ec5740
Parents: c998a2a
Author: Yuanjian Li <xy...@gmail.com>
Authored: Mon Oct 9 14:16:25 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Mon Oct 9 14:16:25 2017 +0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/TaskSetManager.scala |   8 +-
 .../org/apache/spark/scheduler/FakeTask.scala   |  20 +++-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 107 +++++++++++++++++++
 3 files changed, 132 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fe7b219a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3bdede6..de4711f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -83,6 +83,11 @@ private[spark] class TaskSetManager(
   val successful = new Array[Boolean](numTasks)
   private val numFailures = new Array[Int](numTasks)
 
+  // Set the coresponding index of Boolean var when the task killed by other attempt tasks,
+  // this happened while we set the `spark.speculation` to true. The task killed by others
+  // should not resubmit while executor lost.
+  private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks)
+
   val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
   private[scheduler] var tasksSuccessful = 0
 
@@ -729,6 +734,7 @@ private[spark] class TaskSetManager(
       logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
         s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
         s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
+      killedByOtherAttempt(index) = true
       sched.backend.killTask(
         attemptInfo.taskId,
         attemptInfo.executorId,
@@ -915,7 +921,7 @@ private[spark] class TaskSetManager(
         && !isZombie) {
       for ((tid, info) <- taskInfos if info.executorId == execId) {
         val index = taskInfos(tid).index
-        if (successful(index)) {
+        if (successful(index) && !killedByOtherAttempt(index)) {
           successful(index) = false
           copiesRunning(index) -= 1
           tasksSuccessful -= 1

http://git-wip-us.apache.org/repos/asf/spark/blob/fe7b219a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index fe6de2b..109d4a0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -19,8 +19,7 @@ package org.apache.spark.scheduler
 
 import java.util.Properties
 
-import org.apache.spark.SparkEnv
-import org.apache.spark.TaskContext
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
 import org.apache.spark.executor.TaskMetrics
 
 class FakeTask(
@@ -58,4 +57,21 @@ object FakeTask {
     }
     new TaskSet(tasks, stageId, stageAttemptId, priority = 0, null)
   }
+
+  def createShuffleMapTaskSet(
+      numTasks: Int,
+      stageId: Int,
+      stageAttemptId: Int,
+      prefLocs: Seq[TaskLocation]*): TaskSet = {
+    if (prefLocs.size != 0 && prefLocs.size != numTasks) {
+      throw new IllegalArgumentException("Wrong number of task locations")
+    }
+    val tasks = Array.tabulate[Task[_]](numTasks) { i =>
+      new ShuffleMapTask(stageId, stageAttemptId, null, new Partition {
+        override def index: Int = i
+      }, prefLocs(i), new Properties,
+        SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array())
+    }
+    new TaskSet(tasks, stageId, stageAttemptId, priority = 0, null)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fe7b219a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 5c712bd..2ce81ae 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -744,6 +744,113 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     assert(resubmittedTasks === 0)
   }
 
+
+  test("[SPARK-22074] Task killed by other attempt task should not be resubmitted") {
+    val conf = new SparkConf().set("spark.speculation", "true")
+    sc = new SparkContext("local", "test", conf)
+    // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
+    sc.conf.set("spark.speculation.multiplier", "0.0")
+    sc.conf.set("spark.speculation.quantile", "0.5")
+    sc.conf.set("spark.speculation", "true")
+
+    var killTaskCalled = false
+    val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+      ("exec2", "host2"), ("exec3", "host3"))
+    sched.initialize(new FakeSchedulerBackend() {
+      override def killTask(
+          taskId: Long,
+          executorId: String,
+          interruptThread: Boolean,
+          reason: String): Unit = {
+        // Check the only one killTask event in this case, which triggered by
+        // task 2.1 completed.
+        assert(taskId === 2)
+        assert(executorId === "exec3")
+        assert(interruptThread)
+        assert(reason === "another attempt succeeded")
+        killTaskCalled = true
+      }
+    })
+
+    // Keep track of the number of tasks that are resubmitted,
+    // so that the test can check that no tasks were resubmitted.
+    var resubmittedTasks = 0
+    val dagScheduler = new FakeDAGScheduler(sc, sched) {
+      override def taskEnded(
+          task: Task[_],
+          reason: TaskEndReason,
+          result: Any,
+          accumUpdates: Seq[AccumulatorV2[_, _]],
+          taskInfo: TaskInfo): Unit = {
+        super.taskEnded(task, reason, result, accumUpdates, taskInfo)
+        reason match {
+          case Resubmitted => resubmittedTasks += 1
+          case _ =>
+        }
+      }
+    }
+    sched.setDAGScheduler(dagScheduler)
+
+    val taskSet = FakeTask.createShuffleMapTaskSet(4, 0, 0,
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host1", "exec1")),
+      Seq(TaskLocation("host3", "exec3")),
+      Seq(TaskLocation("host2", "exec2")))
+
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+    // Offer resources for 4 tasks to start
+    for ((exec, host) <- Seq(
+      "exec1" -> "host1",
+      "exec1" -> "host1",
+      "exec3" -> "host3",
+      "exec2" -> "host2")) {
+      val taskOption = manager.resourceOffer(exec, host, NO_PREF)
+      assert(taskOption.isDefined)
+      val task = taskOption.get
+      assert(task.executorId === exec)
+      // Add an extra assert to make sure task 2.0 is running on exec3
+      if (task.index == 2) {
+        assert(task.attemptNumber === 0)
+        assert(task.executorId === "exec3")
+      }
+    }
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+    clock.advance(1)
+    // Complete the 2 tasks and leave 2 task in running
+    for (id <- Set(0, 1)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
+    // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
+    // > 0ms, so advance the clock by 1ms here.
+    clock.advance(1)
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(2, 3))
+
+    // Offer resource to start the speculative attempt for the running task 2.0
+    val taskOption = manager.resourceOffer("exec2", "host2", ANY)
+    assert(taskOption.isDefined)
+    val task4 = taskOption.get
+    assert(task4.index === 2)
+    assert(task4.taskId === 4)
+    assert(task4.executorId === "exec2")
+    assert(task4.attemptNumber === 1)
+    // Complete the speculative attempt for the running task
+    manager.handleSuccessfulTask(4, createTaskResult(2, accumUpdatesByTask(2)))
+    // Make sure schedBackend.killTask(2, "exec3", true, "another attempt succeeded") gets called
+    assert(killTaskCalled)
+    // Host 3 Losts, there's only task 2.0 on it, which killed by task 2.1
+    manager.executorLost("exec3", "host3", SlaveLost())
+    // Check the resubmittedTasks
+    assert(resubmittedTasks === 0)
+  }
+
   test("speculative and noPref task should be scheduled after node-local") {
     sc = new SparkContext("local", "test")
     sched = new FakeTaskScheduler(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org