You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/02/07 01:15:30 UTC

git commit: Merge pull request #450 from kayousterhout/fetch_failures. Closes #450.

Updated Branches:
  refs/heads/master 18ad59e2c -> 0b448df6a


Merge pull request #450 from kayousterhout/fetch_failures. Closes #450.

Only run ResubmitFailedStages event after a fetch fails

Previously, the ResubmitFailedStages event was called every
200 milliseconds, leading to a lot of unnecessary event processing
and clogged DAGScheduler logs.

Author: Kay Ousterhout <ka...@gmail.com>

== Merge branch commits ==

commit e603784b3a562980e6f1863845097effe2129d3b
Author: Kay Ousterhout <ka...@gmail.com>
Date:   Wed Feb 5 11:34:41 2014 -0800

    Re-add check for empty set of failed stages

commit d258f0ef50caff4bbb19fb95a6b82186db1935bf
Author: Kay Ousterhout <ka...@gmail.com>
Date:   Wed Jan 15 23:35:41 2014 -0800

    Only run ResubmitFailedStages event after a fetch fails

    Previously, the ResubmitFailedStages event was called every
    200 milliseconds, leading to a lot of unnecessary event processing
    and clogged DAGScheduler logs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0b448df6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0b448df6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0b448df6

Branch: refs/heads/master
Commit: 0b448df6ac520a7977b1eb51e8c55e33f3fd2da8
Parents: 18ad59e
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Thu Feb 6 16:15:24 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Feb 6 16:15:24 2014 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 33 +++++++-------------
 1 file changed, 11 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0b448df6/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 8212415..21d16fa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -155,7 +155,6 @@ class DAGScheduler(
   val failed = new HashSet[Stage]  // Stages that must be resubmitted due to fetch failures
   // Missing tasks from each stage
   val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]]
-  var lastFetchFailureTime: Long = 0  // Used to wait a bit to avoid repeated resubmits
 
   val activeJobs = new HashSet[ActiveJob]
   val resultStageToJob = new HashMap[Stage, ActiveJob]
@@ -177,22 +176,6 @@ class DAGScheduler(
   def start() {
     eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
       /**
-       * A handle to the periodical task, used to cancel the task when the actor is stopped.
-       */
-      var resubmissionTask: Cancellable = _
-
-      override def preStart() {
-        import context.dispatcher
-        /**
-         * A message is sent to the actor itself periodically to remind the actor to resubmit failed
-         * stages.  In this way, stage resubmission can be done within the same thread context of
-         * other event processing logic to avoid unnecessary synchronization overhead.
-         */
-        resubmissionTask = context.system.scheduler.schedule(
-          RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages)
-      }
-
-      /**
        * The main event loop of the DAG scheduler.
        */
       def receive = {
@@ -207,7 +190,6 @@ class DAGScheduler(
           if (!processEvent(event)) {
             submitWaitingStages()
           } else {
-            resubmissionTask.cancel()
             context.stop(self)
           }
       }
@@ -620,6 +602,8 @@ class DAGScheduler(
 
       case ResubmitFailedStages =>
         if (failed.size > 0) {
+          // Failed stages may be removed by job cancellation, so failed might be empty even if
+          // the ResubmitFailedStages event has been scheduled.
           resubmitFailedStages()
         }
 
@@ -926,7 +910,6 @@ class DAGScheduler(
         // Mark the stage that the reducer was in as unrunnable
         val failedStage = stageIdToStage(task.stageId)
         running -= failedStage
-        failed += failedStage
         // TODO: Cancel running tasks in the stage
         logInfo("Marking " + failedStage + " (" + failedStage.name +
           ") for resubmision due to a fetch failure")
@@ -938,10 +921,16 @@ class DAGScheduler(
         }
         logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
           "); marking it for resubmission")
+        if (failed.isEmpty && eventProcessActor != null) {
+          // Don't schedule an event to resubmit failed stages if failed isn't empty, because
+          // in that case the event will already have been scheduled. eventProcessActor may be
+          // null during unit tests.
+          import env.actorSystem.dispatcher
+          env.actorSystem.scheduler.scheduleOnce(
+            RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
+        }
+        failed += failedStage
         failed += mapStage
-        // Remember that a fetch failed now; this is used to resubmit the broken
-        // stages later, after a small wait (to give other tasks the chance to fail)
-        lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
         // TODO: mark the executor as failed only if there were lots of fetch failures on it
         if (bmAddress != null) {
           handleExecutorLost(bmAddress.executorId, Some(task.epoch))