You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2018/07/18 18:24:48 UTC

spark git commit: [SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap

Repository: spark
Updated Branches:
  refs/heads/master fc0c8c971 -> c8bee932c


[SPARK-24677][CORE] Avoid NoSuchElementException from MedianHeap

## What changes were proposed in this pull request?
When speculation is enabled,
TaskSetManager#markPartitionCompleted should write successful task duration to MedianHeap,
not just increase tasksSuccessful.

Otherwise when TaskSetManager#checkSpeculatableTasks,tasksSuccessful non-zero, but MedianHeap is empty.
Then throw an exception successfulTaskDurations.median java.util.NoSuchElementException: MedianHeap is empty.
Finally led to stopping SparkContext.
## How was this patch tested?
TaskSetManagerSuite.scala
unit test:[SPARK-24677] MedianHeap should not be empty when speculation is enabled

Author: sychen <sy...@ctrip.com>

Closes #21656 from cxzl25/fix_MedianHeap_empty.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8bee932
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8bee932
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8bee932

Branch: refs/heads/master
Commit: c8bee932cb644627c4049b5a07dd8028968572d9
Parents: fc0c8c9
Author: sychen <sy...@ctrip.com>
Authored: Wed Jul 18 13:24:41 2018 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Wed Jul 18 13:24:41 2018 -0500

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  7 ++-
 .../apache/spark/scheduler/TaskSetManager.scala |  7 ++-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 49 ++++++++++++++++++++
 3 files changed, 59 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c8bee932/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 598b62f..56c0bf6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -697,9 +697,12 @@ private[spark] class TaskSchedulerImpl(
    * do not also submit those same tasks.  That also means that a task completion from an earlier
    * attempt can lead to the entire stage getting marked as successful.
    */
-  private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = {
+  private[scheduler] def markPartitionCompletedInAllTaskSets(
+      stageId: Int,
+      partitionId: Int,
+      taskInfo: TaskInfo) = {
     taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
-      tsm.markPartitionCompleted(partitionId)
+      tsm.markPartitionCompleted(partitionId, taskInfo)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c8bee932/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index a18c665..6071605 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -758,7 +758,7 @@ private[spark] class TaskSetManager(
     }
     // There may be multiple tasksets for this stage -- we let all of them know that the partition
     // was completed.  This may result in some of the tasksets getting completed.
-    sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId)
+    sched.markPartitionCompletedInAllTaskSets(stageId, tasks(index).partitionId, info)
     // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
     // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
     // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
@@ -769,9 +769,12 @@ private[spark] class TaskSetManager(
     maybeFinishTaskSet()
   }
 
-  private[scheduler] def markPartitionCompleted(partitionId: Int): Unit = {
+  private[scheduler] def markPartitionCompleted(partitionId: Int, taskInfo: TaskInfo): Unit = {
     partitionToIndex.get(partitionId).foreach { index =>
       if (!successful(index)) {
+        if (speculationEnabled && !isZombie) {
+          successfulTaskDurations.insert(taskInfo.duration)
+        }
         tasksSuccessful += 1
         successful(index) = true
         if (tasksSuccessful == numTasks) {

http://git-wip-us.apache.org/repos/asf/spark/blob/c8bee932/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index ca6a7e5..ae571e5 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -1365,6 +1365,55 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
     assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
   }
 
+  test("[SPARK-24677] Avoid NoSuchElementException from MedianHeap") {
+    val conf = new SparkConf().set("spark.speculation", "true")
+    sc = new SparkContext("local", "test", conf)
+    // Set the speculation multiplier to be 0 so speculative tasks are launched immediately
+    sc.conf.set("spark.speculation.multiplier", "0.0")
+    sc.conf.set("spark.speculation.quantile", "0.1")
+    sc.conf.set("spark.speculation", "true")
+
+    sched = new FakeTaskScheduler(sc)
+    sched.initialize(new FakeSchedulerBackend())
+
+    val dagScheduler = new FakeDAGScheduler(sc, sched)
+    sched.setDAGScheduler(dagScheduler)
+
+    val taskSet1 = FakeTask.createTaskSet(10)
+    val accumUpdatesByTask: Array[Seq[AccumulatorV2[_, _]]] = taskSet1.tasks.map { task =>
+      task.metrics.internalAccums
+    }
+
+    sched.submitTasks(taskSet1)
+    sched.resourceOffers(
+      (0 until 10).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })
+
+    val taskSetManager1 = sched.taskSetManagerForAttempt(0, 0).get
+
+    // fail fetch
+    taskSetManager1.handleFailedTask(
+      taskSetManager1.taskAttempts.head.head.taskId, TaskState.FAILED,
+      FetchFailed(null, 0, 0, 0, "fetch failed"))
+
+    assert(taskSetManager1.isZombie)
+    assert(taskSetManager1.runningTasks === 9)
+
+    val taskSet2 = FakeTask.createTaskSet(10, stageAttemptId = 1)
+    sched.submitTasks(taskSet2)
+    sched.resourceOffers(
+      (11 until 20).map { idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) })
+
+    // Complete the 2 tasks and leave 8 task in running
+    for (id <- Set(0, 1)) {
+      taskSetManager1.handleSuccessfulTask(id, createTaskResult(id, accumUpdatesByTask(id)))
+      assert(sched.endedTasks(id) === Success)
+    }
+
+    val taskSetManager2 = sched.taskSetManagerForAttempt(0, 1).get
+    assert(!taskSetManager2.successfulTaskDurations.isEmpty())
+    taskSetManager2.checkSpeculatableTasks(0)
+  }
+
   private def createTaskResult(
       id: Int,
       accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org