You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/12/01 08:39:21 UTC

[1/4] git commit: Bugfix: SPARK-965 & SPARK-966

Updated Branches:
  refs/heads/master 743a31a7c -> 60e23a58b


Bugfix: SPARK-965 & SPARK-966

SPARK-965: https://spark-project.atlassian.net/browse/SPARK-965
SPARK-966: https://spark-project.atlassian.net/browse/SPARK-966

* Add back DAGScheduler.start(), eventProcessActor is created and started here.

  Notice that function is only called by SparkContext.

* Cancel the scheduled stage resubmission task when stopping eventProcessActor

* Add a new DAGSchedulerEvent ResubmitFailedStages

  This event message is sent by the scheduled stage resubmission task to eventProcessActor.  In this way, DAGScheduler.resubmitFailedStages is guaranteed to be executed from the same thread that runs DAGScheduler.processEvent.

  Please refer to discussion in SPARK-966 for details.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/18def5d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/18def5d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/18def5d6

Branch: refs/heads/master
Commit: 18def5d6f20b33c946f9b8b2cea8cfb6848dcc34
Parents: 743a31a
Author: Lian, Cheng <rh...@gmail.com>
Authored: Thu Nov 28 17:46:06 2013 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Thu Nov 28 17:46:06 2013 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  1 +
 .../apache/spark/scheduler/DAGScheduler.scala   | 62 ++++++++++++--------
 .../spark/scheduler/DAGSchedulerEvent.scala     |  2 +
 3 files changed, 40 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18def5d6/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3a80241..c314f01 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -270,6 +270,7 @@ class SparkContext(
   taskScheduler.start()
 
   @volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
+  dagScheduler.start()
 
   ui.start()
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18def5d6/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4457525..e2bf08c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -113,30 +113,7 @@ class DAGScheduler(
   // Warns the user if a stage contains a task with size greater than this value (in KB)
   val TASK_SIZE_TO_WARN = 100
 
-  private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
-    override def preStart() {
-      context.system.scheduler.schedule(RESUBMIT_TIMEOUT milliseconds, RESUBMIT_TIMEOUT milliseconds) {
-        if (failed.size > 0) {
-          resubmitFailedStages()
-        }
-      }
-    }
-
-    /**
-     * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
-     * events and responds by launching tasks. This runs in a dedicated thread and receives events
-     * via the eventQueue.
-     */
-    def receive = {
-      case event: DAGSchedulerEvent =>
-        logDebug("Got event of type " + event.getClass.getName)
-
-        if (!processEvent(event))
-          submitWaitingStages()
-        else
-          context.stop(self)
-    }
-  }))
+  private var eventProcessActor: ActorRef = _
 
   private[scheduler] val nextJobId = new AtomicInteger(0)
 
@@ -177,6 +154,34 @@ class DAGScheduler(
 
   val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
 
+  def start() {
+    eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
+      var resubmissionTask: Cancellable = _
+
+      override def preStart() {
+        resubmissionTask = context.system.scheduler.schedule(
+          RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages)
+      }
+
+      /**
+       * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
+       * events and responds by launching tasks. This runs in a dedicated thread and receives events
+       * via the eventQueue.
+       */
+      def receive = {
+        case event: DAGSchedulerEvent =>
+          logDebug("Got event of type " + event.getClass.getName)
+
+          if (!processEvent(event)) {
+            submitWaitingStages()
+          } else {
+            resubmissionTask.cancel()
+            context.stop(self)
+          }
+      }
+    }))
+  }
+
   def addSparkListener(listener: SparkListener) {
     listenerBus.addListener(listener)
   }
@@ -457,6 +462,11 @@ class DAGScheduler(
       case TaskSetFailed(taskSet, reason) =>
         abortStage(stageIdToStage(taskSet.stageId), reason)
 
+      case ResubmitFailedStages =>
+        if (failed.size > 0) {
+          resubmitFailedStages()
+        }
+
       case StopDAGScheduler =>
         // Cancel any active jobs
         for (job <- activeJobs) {
@@ -900,7 +910,9 @@ class DAGScheduler(
   }
 
   def stop() {
-    eventProcessActor ! StopDAGScheduler
+    if (eventProcessActor != null) {
+      eventProcessActor ! StopDAGScheduler
+    }
     metadataCleaner.cancel()
     taskSched.stop()
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18def5d6/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 708d221..5353cd2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -73,4 +73,6 @@ private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerE
 private[scheduler]
 case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
 
+private[scheduler] case object ResubmitFailedStages extends DAGSchedulerEvent
+
 private[scheduler] case object StopDAGScheduler extends DAGSchedulerEvent


[2/4] git commit: Updated some inline comments in DAGScheduler

Posted by rx...@apache.org.
Updated some inline comments in DAGScheduler


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1e250860
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1e250860
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1e250860

Branch: refs/heads/master
Commit: 1e25086009ff6421790609e406d00e1b978d6dbe
Parents: 18def5d
Author: Lian, Cheng <rh...@gmail.com>
Authored: Fri Nov 29 15:56:47 2013 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Fri Nov 29 15:56:47 2013 +0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 31 ++++++++++++++++----
 1 file changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e250860/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index e2bf08c..08cf763 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -154,24 +154,43 @@ class DAGScheduler(
 
   val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
 
+  /**
+   * Starts the event processing actor.  The actor has two responsibilities:
+   *
+   * 1. Waits for events like job submission, task finished, task failure etc., and calls
+   *    [[org.apache.spark.scheduler.DAGScheduler.processEvent()]] to process them.
+   * 2. Schedules a periodical task to resubmit failed stages.
+   *
+   * NOTE: the actor cannot be started in the constructor, because the periodical task references
+   * some internal states of the enclosing [[org.apache.spark.scheduler.DAGScheduler]] object, thus
+   * cannot be scheduled until the [[org.apache.spark.scheduler.DAGScheduler]] is fully constructed.
+   */
   def start() {
     eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
       var resubmissionTask: Cancellable = _
 
       override def preStart() {
+        /**
+         * A message is sent to the actor itself periodically to remind the actor to resubmit failed
+         * stages.  In this way, stage resubmission can be done within the same thread context of
+         * other event processing logic to avoid unnecessary synchronization overhead.
+         */
         resubmissionTask = context.system.scheduler.schedule(
           RESUBMIT_TIMEOUT.millis, RESUBMIT_TIMEOUT.millis, self, ResubmitFailedStages)
       }
 
       /**
-       * The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
-       * events and responds by launching tasks. This runs in a dedicated thread and receives events
-       * via the eventQueue.
+       * The main event loop of the DAG scheduler.
        */
       def receive = {
         case event: DAGSchedulerEvent =>
           logDebug("Got event of type " + event.getClass.getName)
 
+          /**
+           * All events are forwarded to `processEvent()`, so that the event processing logic can
+           * easily tested without starting a dedicated actor.  Please refer to `DAGSchedulerSuite`
+           * for details.
+           */
           if (!processEvent(event)) {
             submitWaitingStages()
           } else {
@@ -383,8 +402,10 @@ class DAGScheduler(
   }
 
   /**
-   * Process one event retrieved from the event queue.
-   * Returns true if we should stop the event loop.
+   * Process one event retrieved from the event processing actor.
+   *
+   * @param event The event to be processed.
+   * @return `true` if we should stop the event loop.
    */
   private[scheduler] def processEvent(event: DAGSchedulerEvent): Boolean = {
     event match {


[3/4] git commit: More comments

Posted by rx...@apache.org.
More comments


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4a1d966e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4a1d966e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4a1d966e

Branch: refs/heads/master
Commit: 4a1d966e26e56fc5d42a828f414b4eca433c3a22
Parents: 1e25086
Author: Lian, Cheng <rh...@gmail.com>
Authored: Fri Nov 29 16:02:58 2013 +0800
Committer: Lian, Cheng <rh...@gmail.com>
Committed: Fri Nov 29 16:02:58 2013 +0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4a1d966e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 08cf763..bc37a70 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -167,6 +167,9 @@ class DAGScheduler(
    */
   def start() {
     eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
+      /**
+       * A handle to the periodical task, used to cancel the task when the actor is stopped.
+       */
       var resubmissionTask: Cancellable = _
 
       override def preStart() {


[4/4] git commit: Merge pull request #216 from liancheng/fix-spark-966

Posted by rx...@apache.org.
Merge pull request #216 from liancheng/fix-spark-966

Bugfix: SPARK-965 & SPARK-966

SPARK-965: https://spark-project.atlassian.net/browse/SPARK-965
SPARK-966: https://spark-project.atlassian.net/browse/SPARK-966

* Add back `DAGScheduler.start()`, `eventProcessActor` is created and started here.

  Notice that function is only called by `SparkContext`.

* Cancel the scheduled stage resubmission task when stopping `eventProcessActor`

* Add a new `DAGSchedulerEvent` `ResubmitFailedStages`

  This event message is sent by the scheduled stage resubmission task to `eventProcessActor`.  In this way, `DAGScheduler.resubmitFailedStages()` is guaranteed to be executed from the same thread that runs `DAGScheduler.processEvent()`.

  Please refer to discussion in [SPARK-966](https://spark-project.atlassian.net/browse/SPARK-966) for details.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/60e23a58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/60e23a58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/60e23a58

Branch: refs/heads/master
Commit: 60e23a58b288dae3c87da28e1506323b1d88ee9e
Parents: 743a31a 4a1d966
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Nov 30 23:38:49 2013 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Nov 30 23:38:49 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  1 +
 .../apache/spark/scheduler/DAGScheduler.scala   | 90 ++++++++++++++------
 .../spark/scheduler/DAGSchedulerEvent.scala     |  2 +
 3 files changed, 66 insertions(+), 27 deletions(-)
----------------------------------------------------------------------