You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/12/01 08:39:22 UTC
[2/4] git commit: Updated some inline comments in DAGScheduler
Updated some inline comments in DAGScheduler
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1e250860
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1e250860
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1e250860
Branch: refs/heads/master
Commit: 1e25086009ff6421790609e406d00e1b978d6dbe
Parents: 18def5d
Author: Lian, Cheng <rh...@gmail.com>
Authored: Fri Nov 29 15:56:47 2013 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Fri Nov 29 15:56:47 2013 +0800
----------------------------------------------------------------------
.../apache/spark/scheduler/DAGScheduler.scala | 31 ++++++++++++++++----
1 file changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e250860/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index e2bf08c..08cf763 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -154,24 +154,43 @@ class DAGScheduler(
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
+ /**
+ * Starts the event processing actor. The actor has two responsibilities:
+ *
+ * 1. Waits for events like job submission, task finished, task failure etc., and calls
+ * [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them.
+ * 2. Schedules a periodical task to resubmit failed stages.
+ *
+ * NOTE: the actor cannot be started in the constructor, because the periodical task references
+ * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus
+ * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed.
+ */
def start() {
eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
var resubmissionTask: Cancellable = _
override def preStart() {
+ /**
+ * A message is sent to the actor itself periodically to remind the actor to resubmit failed
+ * stages. In this way, stage resubmission can be done within the same thread context of
+ * other event processing logic to avoid unnecessary synchronization overhead.
+ */
resubmissionTask = context.system.scheduler.schedule(
RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages)
}
/**
- * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
- * events and responds by launching tasks. This runs in a dedicated thread and receives events
- * via the eventQueue.
+ * The main event loop of the DAG scheduler.
*/
def receive = {
case event: DAGSchedulerEvent =>
logDebug("Got event of type " + event.getClass.getName)
+ /**
+ * All events are forwarded to `processEvent()`, so that the event processing logic can
+ * easily tested without starting a dedicated actor. Please refer to `DAGSchedulerSuite`
+ * for details.
+ */
if (!processEvent(event)) {
submitWaitingStages()
} else {
@@ -383,8 +402,10 @@ class DAGScheduler(
}
/**
- * Process one event retrieved from the event queue.
- * Returns true if we should stop the event loop.
+ * Process one event retrieved from the event processing actor.
+ *
+ * @param event The event to be processed.
+ * @return `true` if we should stop the event loop.
*/
private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
event match {