You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/12/01 08:39:22 UTC

[2/4] git commit: Updated some inline comments in DAGScheduler

Updated some inline comments in DAGScheduler


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1e250860
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1e250860
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1e250860

Branch: refs/heads/master
Commit: 1e25086009ff6421790609e406d00e1b978d6dbe
Parents: 18def5d
Author: Lian, Cheng <rh...@gmail.com>
Authored: Fri Nov 29 15:56:47 2013 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Fri Nov 29 15:56:47 2013 +0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 31 ++++++++++++++++----
 1 file changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e250860/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index e2bf08c..08cf763 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -154,24 +154,43 @@ class DAGScheduler(
 
   val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
 
+  /**
+   * Starts the event processing actor.  The actor has two responsibilities:
+   *
+   * 1. Waits for events like job submission, task finished, task failure etc., and calls
+   *    [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them.
+   * 2. Schedules a periodical task to resubmit failed stages.
+   *
+   * NOTE: the actor cannot be started in the constructor, because the periodical task references
+   * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus
+   * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed.
+   */
   def start() {
     eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
       var resubmissionTask: Cancellable = _
 
       override def preStart() {
+        /**
+         * A message is sent to the actor itself periodically to remind the actor to resubmit failed
+         * stages.  In this way, stage resubmission can be done within the same thread context of
+         * other event processing logic to avoid unnecessary synchronization overhead.
+         */
         resubmissionTask = context.system.scheduler.schedule(
           RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages)
       }
 
       /**
-       * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
-       * events and responds by launching tasks. This runs in a dedicated thread and receives events
-       * via the eventQueue.
+       * The main event loop of the DAG scheduler.
        */
       def receive = {
         case event: DAGSchedulerEvent =>
           logDebug("Got event of type " + event.getClass.getName)
 
+          /**
+           * All events are forwarded to `processEvent()`, so that the event processing logic can
+           * easily tested without starting a dedicated actor.  Please refer to `DAGSchedulerSuite`
+           * for details.
+           */
           if (!processEvent(event)) {
             submitWaitingStages()
           } else {
@@ -383,8 +402,10 @@ class DAGScheduler(
   }
 
   /**
-   * Process one event retrieved from the event queue.
-   * Returns true if we should stop the event loop.
+   * Process one event retrieved from the event processing actor.
+   *
+   * @param event The event to be processed.
+   * @return `true` if we should stop the event loop.
    */
   private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
     event match {