You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/11/14 01:50:01 UTC
[3/5] git commit: Put the periodical resubmitFailedStages() call into
a scheduled task
Put the periodical resubmitFailedStages() call into a scheduled task
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ba552851
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ba552851
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ba552851
Branch: refs/heads/master
Commit: ba552851771cf8eaf90b72b661c3df60080d0ef9
Parents: 765ebca
Author: Lian, Cheng <rh...@gmail.com>
Authored: Mon Nov 11 01:25:35 2013 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Mon Nov 11 01:25:35 2013 +0800
----------------------------------------------------------------------
.../apache/spark/scheduler/DAGScheduler.scala | 28 +++++++++-----------
1 file changed, 12 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ba552851/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a73a6e1..7499570 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -21,7 +21,8 @@ import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
-import akka.actor.{Props, Actor, ActorRef}
+import akka.actor._
+import akka.util.duration._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import org.apache.spark._
@@ -110,6 +111,13 @@ class DAGScheduler(
val POLL_TIMEOUT = 10L
private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
+ override def preStart() {
+ env.actorSystem.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
+ if (failed.size > 0)
+ resubmitFailedStages()
+ }
+ }
+
/**
* The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
* events and responds by launching tasks. This runs in a dedicated thread and receives events
@@ -119,22 +127,10 @@ class DAGScheduler(
case event: DAGSchedulerEvent =>
logDebug("Got event of type " + event.getClass.getName)
- if (!processEvent(event)) {
- val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
- // Periodically resubmit failed stages if some map output fetches have failed and we have
- // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
- // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
- // the same time, so we want to make sure we've identified all the reduce tasks that depend
- // on the failed node.
- if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
- resubmitFailedStages()
- } else {
- submitWaitingStages()
- }
- }
- else {
+ if (!processEvent(event))
+ submitWaitingStages()
+ else
context.stop(self)
- }
}
}))