You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/03/14 20:32:18 UTC

spark git commit: [SPARK-13054] Always post TaskEnd event for tasks

Repository: spark
Updated Branches:
  refs/heads/master e06493cb7 -> 23385e853


[SPARK-13054] Always post TaskEnd event for tasks

I am using dynamic container allocation and speculation and am seeing issues with the active task accounting. The Executor UI still shows active tasks on the an executor but the job/stage is all completed. I think its also affecting the dynamic allocation being able to release containers because it thinks there are still tasks.
There are multiple issues with this:
-  If the task end for tasks (in this case probably because of speculation) comes in after the stage is finished, then the DAGScheduler.handleTaskCompletion will skip the task completion event

Author: Thomas Graves <tg...@prevailsail.corp.gq1.yahoo.com>
Author: Thomas Graves <tg...@staydecay.corp.gq1.yahoo.com>
Author: Tom Graves <tg...@yahoo-inc.com>

Closes #10951 from tgravescs/SPARK-11701.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23385e85
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23385e85
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23385e85

Branch: refs/heads/master
Commit: 23385e853e7ca54332c6098cf83da7d0723546fe
Parents: e06493c
Author: Thomas Graves <tg...@prevailsail.corp.gq1.yahoo.com>
Authored: Mon Mar 14 12:31:46 2016 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Mar 14 12:31:46 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 16 +++---
 .../spark/scheduler/DAGSchedulerSuite.scala     | 58 ++++++++++++++++++++
 2 files changed, 65 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/23385e85/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b576d4c..8a36af2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1148,13 +1148,13 @@ class DAGScheduler(
         null
       }
 
-    // The success case is dealt with separately below.
-    // TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here.
-    if (event.reason != Success) {
-      val attemptId = task.stageAttemptId
-      listenerBus.post(SparkListenerTaskEnd(
-        stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics))
-    }
+    // The stage may have already finished when we get this event -- eg. maybe it was a
+    // speculative task. It is important that we send the TaskEnd event in any case, so listeners
+    // are properly notified and can chose to handle it. For instance, some listeners are
+    // doing their own accounting and if they don't get the task end event they think
+    // tasks are still running when they really aren't.
+    listenerBus.post(SparkListenerTaskEnd(
+       stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))
 
     if (!stageIdToStage.contains(task.stageId)) {
       // Skip all the actions if the stage has been cancelled.
@@ -1164,8 +1164,6 @@ class DAGScheduler(
     val stage = stageIdToStage(task.stageId)
     event.reason match {
       case Success =>
-        listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
-          event.reason, event.taskInfo, taskMetrics))
         stage.pendingPartitions -= task.partitionId
         task match {
           case rt: ResultTask[_, _] =>

http://git-wip-us.apache.org/repos/asf/spark/blob/23385e85/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index d1c7143..55f4190 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -134,6 +134,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     val successfulStages = new HashSet[Int]
     val failedStages = new ArrayBuffer[Int]
     val stageByOrderOfExecution = new ArrayBuffer[Int]
+    val endedTasks = new HashSet[Long]
 
     override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
       submittedStageInfos += stageSubmitted.stageInfo
@@ -148,6 +149,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         failedStages += stageInfo.stageId
       }
     }
+
+    override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
+      endedTasks += taskEnd.taskInfo.taskId
+    }
   }
 
   var mapOutputTracker: MapOutputTrackerMaster = null
@@ -195,6 +200,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     sparkListener.submittedStageInfos.clear()
     sparkListener.successfulStages.clear()
     sparkListener.failedStages.clear()
+    sparkListener.endedTasks.clear()
     failure = null
     sc.addSparkListener(sparkListener)
     taskSets.clear()
@@ -982,6 +988,52 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     assert(countSubmittedMapStageAttempts() === 2)
   }
 
+  test("task events always posted in speculation / when stage is killed") {
+    val baseRdd = new MyRDD(sc, 4, Nil)
+    val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd)))
+    submit(finalRdd, Array(0, 1, 2, 3))
+
+    // complete two tasks
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(0), Success, 42,
+      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(0)))
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(1), Success, 42,
+      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1)))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    // verify stage exists
+    assert(scheduler.stageIdToStage.contains(0))
+    assert(sparkListener.endedTasks.size == 2)
+
+    // finish other 2 tasks
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(2), Success, 42,
+      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(2)))
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(3), Success, 42,
+      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(3)))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    assert(sparkListener.endedTasks.size == 4)
+
+    // verify the stage is done
+    assert(!scheduler.stageIdToStage.contains(0))
+
+    // Stage should be complete. Finish one other Successful task to simulate what can happen
+    // with a speculative task and make sure the event is sent out
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(3), Success, 42,
+      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(5)))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    assert(sparkListener.endedTasks.size == 5)
+
+    // make sure non successful tasks also send out event
+    runEvent(makeCompletionEvent(
+      taskSets(0).tasks(3), UnknownReason, 42,
+      Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(6)))
+    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+    assert(sparkListener.endedTasks.size == 6)
+  }
+
   test("ignore late map task completions") {
     val shuffleMapRdd = new MyRDD(sc, 2, Nil)
     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
@@ -1944,6 +1996,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
     info
   }
 
+  private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = {
+    val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false)
+    info.finishTime = 1  // to prevent spurious errors in JobProgressListener
+    info
+  }
+
   private def makeCompletionEvent(
       task: Task[_],
       reason: TaskEndReason,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org