You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/11/14 01:50:01 UTC

[3/5] git commit: Put the periodical resubmitFailedStages() call into a scheduled task

Put the periodical resubmitFailedStages() call into a scheduled task


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ba552851
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ba552851
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ba552851

Branch: refs/heads/master
Commit: ba552851771cf8eaf90b72b661c3df60080d0ef9
Parents: 765ebca
Author: Lian, Cheng <rh...@gmail.com>
Authored: Mon Nov 11 01:25:35 2013 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Mon Nov 11 01:25:35 2013 +0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 28 +++++++++-----------
 1 file changed, 12 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba552851/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a73a6e1..7499570 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -21,7 +21,8 @@ import java.io.NotSerializableException
 import java.util.Properties
 import java.util.concurrent.atomic.AtomicInteger
 
-import akka.actor.{Props, Actor, ActorRef}
+import akka.actor._
+import akka.util.duration._
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
 
 import org.apache.spark._
@@ -110,6 +111,13 @@ class DAGScheduler(
   val POLL_TIMEOUT = 10L
 
   private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
+    override def preStart() {
+      env.actorSystem.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
+        if (failed.size > 0)
+          resubmitFailedStages()
+      }
+    }
+
     /**
      * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
      * events and responds by launching tasks. This runs in a dedicated thread and receives events
@@ -119,22 +127,10 @@ class DAGScheduler(
       case event: DAGSchedulerEvent =>
         logDebug("Got event of type " + event.getClass.getName)
 
-        if (!processEvent(event)) {
-          val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
-          // Periodically resubmit failed stages if some map output fetches have failed and we have
-          // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
-          // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
-          // the same time, so we want to make sure we've identified all the reduce tasks that depend
-          // on the failed node.
-          if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
-            resubmitFailedStages()
-          } else {
-            submitWaitingStages()
-          }
-        }
-        else {
+        if (!processEvent(event))
+          submitWaitingStages()
+        else
           context.stop(self)
-        }
     }
   }))