spark git commit: [SPARK-13343] speculative tasks that didn't commit shouldn't be marked as success

Repository: spark
Updated Branches:
  refs/heads/master ee5a5a092 -> 5828f41a5

[SPARK-13343] speculative tasks that didn't commit shouldn't be marked as success

Currently Speculative tasks that didn't commit can show up as success (depending on timing of commit). This is a bit confusing because that task didn't really succeed in the sense it didn't write anything.
I think these tasks should be marked as KILLED or something that is more obvious to the user exactly what happened. it is happened to hit the timing where it got a commit denied exception then it shows up as failed and counts against your task failures. It shouldn't count against task failures since that failure really doesn't matter.
MapReduce handles these situation so perhaps we can look there for a model.

<img width="1420" alt="unknown" src="">

**How can this issue happen?**
When both attempts of a task finish before the driver sends command to kill one of them, both of them send the status update FINISHED to the driver. The driver calls TaskSchedulerImpl to handle one successful task at a time. When it handles the first successful task, it sends the command to kill the other copy of the task, however, because that task is already finished, the executor will ignore the command. After finishing handling the first attempt, it processes the second one, although all actions on the result of this task are skipped, this copy of the task is still marked as SUCCESS. As a result, even though this issue does not affect the result of the job, it might cause confusing to user because both of them appear to be successful.

**How does this PR fix the issue?**
The simple way to fix this issue is that when taskSetManager handles successful task, it checks if any other attempt succeeded. If this is the case, it will call handleFailedTask with state==KILLED and reason==TaskKilled(“another attempt succeeded”) to handle this task as begin killed.

**How was this patch tested?**
I tested this manually by running applications, that caused the issue before, a few times, and observed that the issue does not happen again. Also, I added a unit test in TaskSetManagerSuite to test that if we call handleSuccessfulTask to handle status update for 2 copies of a task, only the one that is handled first will be mark as SUCCESS

Author: Hieu Huynh <“”>
Author: hthuynh2 <>

Closes #21653 from hthuynh2/SPARK_13343.


Branch: refs/heads/master
Commit: 5828f41a52c446b774a909e96eff8d8c5831b394
Parents: ee5a5a0
Author: Hieu Huynh <“”>
Authored: Fri Jul 27 12:34:14 2018 -0500
Committer: Thomas Graves <>
Committed: Fri Jul 27 12:34:14 2018 -0500

 .../apache/spark/scheduler/TaskSetManager.scala | 19 +++++-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 70 ++++++++++++++++++++
 2 files changed, 88 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 0b21256..8b77641 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -29,7 +29,7 @@ import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.scheduler.SchedulingMode._
-import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, Utils}
+import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, SystemClock, Utils}
 import org.apache.spark.util.collection.MedianHeap
@@ -728,6 +728,23 @@ private[spark] class TaskSetManager(
   def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
     val info = taskInfos(tid)
     val index = info.index
+    // Check if any other attempt succeeded before this and this attempt has not been handled
+    if (successful(index) && killedByOtherAttempt.contains(tid)) {
+      // Undo the effect on calculatedTasks and totalResultSize made earlier when
+      // checking if can fetch more results
+      calculatedTasks -= 1
+      val resultSizeAcc = result.accumUpdates.find(a =>
+ == Some(InternalAccumulator.RESULT_SIZE))
+      if (resultSizeAcc.isDefined) {
+        totalResultSize -= resultSizeAcc.get.asInstanceOf[LongAccumulator].value
+      }
+      // Handle this task as a killed task
+      handleFailedTask(tid, TaskState.KILLED,
+        TaskKilled("Finish but did not commit due to another attempt succeeded"))
+      return
+    }
     info.markFinished(TaskState.FINISHED, clock.getTimeMillis())
     if (speculationEnabled) {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 206b9f4..cf05434 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1532,4 +1532,74 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     val valueSer = SparkEnv.get.serializer.newInstance()
     new DirectTaskResult[Int](valueSer.serialize(id), accumUpdates)
+  test("SPARK-13343 speculative tasks that didn't commit shouldn't be marked as success") {
+    sc = new SparkContext("local", "test")
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    val taskSet = FakeTask.createTaskSet(4)
+    // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
+    sc.conf.set("spark.speculation.multiplier", "0.0")
+    sc.conf.set("spark.speculation", "true")
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = { task =>
+      task.metrics.internalAccums
+    }
+    // Offer resources for 4 tasks to start
+    for ((k, v) <- List(
+        "exec1" -> "host1",
+        "exec1" -> "host1",
+        "exec2" -> "host2",
+        "exec2" -> "host2")) {
+      val taskOption = manager.resourceOffer(k, v, NO_PREF)
+      assert(taskOption.isDefined)
+      val task = taskOption.get
+      assert(task.executorId === k)
+    }
+    assert(sched.startedTasks.toSet === Set(0, 1, 2, 3))
+    clock.advance(1)
+    // Complete the 3 tasks and leave 1 task in running
+    for (id <- Set(0, 1, 2)) {
+      manager.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+    // checkSpeculatableTasks checks that the task runtime is greater than the threshold for
+    // speculating. Since we use a threshold of 0 for speculation, tasks need to be running for
+    // > 0ms, so advance the clock by 1ms here.
+    clock.advance(1)
+    assert(manager.checkSpeculatableTasks(0))
+    assert(sched.speculativeTasks.toSet === Set(3))
+    // Offer resource to start the speculative attempt for the running task
+    val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)
+    assert(taskOption5.isDefined)
+    val task5 = taskOption5.get
+    assert(task5.index === 3)
+    assert(task5.taskId === 4)
+    assert(task5.executorId === "exec1")
+    assert(task5.attemptNumber === 1)
+    sched.backend = mock(classOf[SchedulerBackend])
+    sched.dagScheduler.stop()
+    sched.dagScheduler = mock(classOf[DAGScheduler])
+    // Complete one attempt for the running task
+    val result = createTaskResult(3, accumUpdatesByTask(3))
+    manager.handleSuccessfulTask(3, result)
+    // There is a race between the scheduler asking to kill the other task, and that task
+    // actually finishing. We simulate what happens if the other task finishes before we kill it.
+    verify(sched.backend).killTask(4, "exec1", true, "another attempt succeeded")
+    manager.handleSuccessfulTask(4, result)
+    val info3 = manager.taskInfos(3)
+    val info4 = manager.taskInfos(4)
+    assert(info3.successful)
+    assert(info4.killed)
+    verify(sched.dagScheduler).taskEnded(
+      manager.tasks(3),
+      TaskKilled("Finish but did not commit due to another attempt succeeded"),
+      null,
+      Seq.empty,
+      info4)
+    verify(sched.dagScheduler).taskEnded(manager.tasks(3), Success, result.value(),
+      result.accumUpdates, info3)
+  }

