You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by js...@apache.org on 2017/10/09 06:17:12 UTC
spark git commit: [SPARK-22074][CORE] Task killed by other attempt
task should not be resubmitted
Repository: spark
Updated Branches:
refs/heads/master c998a2ae0 -> fe7b219ae
[SPARK-22074][CORE] Task killed by other attempt task should not be resubmitted
## What changes were proposed in this pull request?
As the detail scenario described in [SPARK-22074](https://issues.apache.org/jira/browse/SPARK-22074), unnecessary resubmitted may cause stage hanging in currently release versions. This patch add a new var in TaskInfo to mark this task killed by other attempt or not.
## How was this patch tested?
Add a new UT `[SPARK-22074] Task killed by other attempt task should not be resubmitted` in TaskSetManagerSuite, this UT recreate the scenario in JIRA description, it failed without the changes in this PR and passed conversely.
Author: Yuanjian Li <xy...@gmail.com>
Closes #19287 from xuanyuanking/SPARK-22074.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe7b219a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe7b219a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe7b219a
Branch: refs/heads/master
Commit: fe7b219ae3e8a045655a836cbb77219036ec5740
Parents: c998a2a
Author: Yuanjian Li <xy...@gmail.com>
Authored: Mon Oct 9 14:16:25 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Mon Oct 9 14:16:25 2017 +0800
----------------------------------------------------------------------
.../apache/spark/scheduler/TaskSetManager.scala | 8 +-
.../org/apache/spark/scheduler/FakeTask.scala | 20 +++-
.../spark/scheduler/TaskSetManagerSuite.scala | 107 +++++++++++++++++++
3 files changed, 132 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/fe7b219a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 3bdede6..de4711f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -83,6 +83,11 @@ private[spark] class TaskSetManager(
val successful = new Array[Boolean](numTasks)
private val numFailures = new Array[Int](numTasks)
+ // Set the coresponding index of Boolean var when the task killed by other attempt tasks,
+ // this happened while we set the `spark.speculation` to true. The task killed by others
+ // should not resubmit while executor lost.
+ private val killedByOtherAttempt: Array[Boolean] = new Array[Boolean](numTasks)
+
val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
private[scheduler] var tasksSuccessful = 0
@@ -729,6 +734,7 @@ private[spark] class TaskSetManager(
logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
+ killedByOtherAttempt(index) = true
sched.backend.killTask(
attemptInfo.taskId,
attemptInfo.executorId,
@@ -915,7 +921,7 @@ private[spark] class TaskSetManager(
&& !isZombie) {
for ((tid, info) <- taskInfos if info.executorId == execId) {
val index = taskInfos(tid).index
- if (successful(index)) {
+ if (successful(index) && !killedByOtherAttempt(index)) {
successful(index) = false
copiesRunning(index) -= 1
tasksSuccessful -= 1
http://git-wip-us.apache.org/repos/asf/spark/blob/fe7b219a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index fe6de2b..109d4a0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -19,8 +19,7 @@ package org.apache.spark.scheduler
import java.util.Properties
-import org.apache.spark.SparkEnv
-import org.apache.spark.TaskContext
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
import org.apache.spark.executor.TaskMetrics
class FakeTask(
@@ -58,4 +57,21 @@ object FakeTask {
}
new TaskSet(tasks, stageId, stageAttemptId, priority = 0, null)
}
+
+ def createShuffleMapTaskSet(
+ numTasks: Int,
+ stageId: Int,
+ stageAttemptId: Int,
+ prefLocs: Seq[TaskLocation]*): TaskSet = {
+ if (prefLocs.size != 0 && prefLocs.size != numTasks) {
+ throw new IllegalArgumentException("Wrong number of task locations")
+ }
+ val tasks = Array.tabulate[Task[_]](numTasks) { i =>
+ new ShuffleMapTask(stageId, stageAttemptId, null, new Partition {
+ override def index: Int = i
+ }, prefLocs(i), new Properties,
+ SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array())
+ }
+ new TaskSet(tasks, stageId, stageAttemptId, priority = 0, null)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/fe7b219a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 5c712bd..2ce81ae 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -744,6 +744,113 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
assert(resubmittedTasks === 0)
}
+
+ test("[SPARK-22074] Task killed by other attempt task should not be resubmitted") {
+ val conf = new SparkConf().set("spark.speculation", "true")
+ sc = new SparkContext("local", "test", conf)
+ // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
+ sc.conf.set("spark.speculation.multiplier", "0.0")
+ sc.conf.set("spark.speculation.quantile", "0.5")
+ sc.conf.set("spark.speculation", "true")
+
+ var killTaskCalled = false
+ val sched = new FakeTaskScheduler(sc, ("exec1", "host1"),
+ ("exec2", "host2"), ("exec3", "host3"))
+ sched.initialize(new FakeSchedulerBackend() {
+ override def killTask(
+ taskId: Long,
+ executorId: String,
+ interruptThread: Boolean,
+ reason: String): Unit = {
+ // Check the only one killTask event in this case, which triggered by
+ // task 2.1 completed.
+ assert(taskId === 2)
+ assert(executorId === "exec3")
+ assert(interruptThread)
+ assert(reason === "another attempt succeeded")
+ killTaskCalled = true
+ }
+ })
+
+ // Keep track of the number of tasks that are resubmitted,
+ // so that the test can check that no tasks were resubmitted.
+ var resubmittedTasks = 0
+ val dagScheduler = new FakeDAGScheduler(sc, sched) {
+ override def taskEnded(
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ accumUpdates: Seq[AccumulatorV2[_, _]],
+ taskInfo: TaskInfo): Unit = {
+ super.taskEnded(task, reason, result, accumUpdates, taskInfo)
+ reason match {
+ case Resubmitted => resubmittedTasks += 1
+ case _ =>
+ }
+ }
+ }
+ sched.setDAGScheduler(dagScheduler)
+
+ val taskSet = FakeTask.createShuffleMapTaskSet(4, 0, 0,
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host1", "exec1")),
+ Seq(TaskLocation("host3", "exec3")),
+ Seq(TaskLocation("host2", "exec2")))
+
+ val clock = new ManualClock()
+ val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+ val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet.tasks.map { task =>
+ task.metrics.internalAccums
+ }
+ // Offer resources for 4 tasks to start
+ for ((exec, host) <- Seq(
+ "exec1" -> "host1",
+ "exec1" -> "host1",
+ "exec3" -> "host3",
+ "exec2" -> "host2")) {
+ val taskOption = manager.resourceOffer(exec, host, NO_PREF)
+ assert(taskOption.isDefined)
+ val task = taskOption.get
+ assert(task.executorId === exec)
+ // Add an extra assert to make sure task 2.0 is running on exec3
+ if (task.index == 2) {
+ assert(task.attemptNumber === 0)
+ assert(task.executorId === "exec3")
+ }
+ }
+ assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+ clock.advance(1)
+ // Complete the 2 tasks and leave 2 task in running
+ for (id <- Set(0, 1)) {
+ manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
+ assert(sched.endedTasks(id) === Success)
+ }
+
+ // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
+ // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
+ // > 0ms, so advance the clock by 1ms here.
+ clock.advance(1)
+ assert(manager.checkSpeculatableTasks(0))
+ assert(sched.speculativeTasks.toSet === Set(2, 3))
+
+ // Offer resource to start the speculative attempt for the running task 2.0
+ val taskOption = manager.resourceOffer("exec2", "host2", ANY)
+ assert(taskOption.isDefined)
+ val task4 = taskOption.get
+ assert(task4.index === 2)
+ assert(task4.taskId === 4)
+ assert(task4.executorId === "exec2")
+ assert(task4.attemptNumber === 1)
+ // Complete the speculative attempt for the running task
+ manager.handleSuccessfulTask(4, createTaskResult(2, accumUpdatesByTask(2)))
+ // Make sure schedBackend.killTask(2, "exec3", true, "another attempt succeeded") gets called
+ assert(killTaskCalled)
+ // Host 3 Losts, there's only task 2.0 on it, which killed by task 2.1
+ manager.executorLost("exec3", "host3", SlaveLost())
+ // Check the resubmittedTasks
+ assert(resubmittedTasks === 0)
+ }
+
test("speculative and noPref task should be scheduled after node-local") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org