You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/06 20:50:10 UTC

[09/12] git commit: Tightly couple stageIdToJobIds and jobIdToStageIds

Tightly couple stageIdToJobIds and jobIdToStageIds


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/9ae2d094
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/9ae2d094
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/9ae2d094

Branch: refs/heads/master
Commit: 9ae2d094a967782e3f5a624dd854059a40430ee6
Parents: 27c45e5
Author: Mark Hamstra <ma...@gmail.com>
Authored: Fri Nov 22 13:14:26 2013 -0800
Committer: Mark Hamstra <ma...@gmail.com>
Committed: Tue Dec 3 09:57:32 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 29 ++++++++------------
 1 file changed, 12 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9ae2d094/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index aeac14a..01c5133 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -259,7 +259,7 @@ class DAGScheduler(
     val stage =
       new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
     stageIdToStage(id) = stage
-    registerJobIdWithStages(jobId, stage)
+    updateJobIdStageIdMaps(jobId, stage)
     stageToInfos(stage) = new StageInfo(stage)
     stage
   }
@@ -348,30 +348,24 @@ class DAGScheduler(
    * Registers the given jobId among the jobs that need the given stage and
    * all of that stage's ancestors.
    */
-  private def registerJobIdWithStages(jobId: Int, stage: Stage) {
-    def registerJobIdWithStageList(stages: List[Stage]) {
+  private def updateJobIdStageIdMaps(jobId: Int, stage: Stage) {
+    def updateJobIdStageIdMapsList(stages: List[Stage]) {
       if (!stages.isEmpty) {
         val s = stages.head
         stageIdToJobIds.getOrElseUpdate(s.id, new HashSet[Int]()) += jobId
+        jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id
         val parents = getParentStages(s.rdd, jobId)
         val parentsWithoutThisJobId = parents.filter(p => !stageIdToJobIds.get(p.id).exists(_.contains(jobId)))
-        registerJobIdWithStageList(parentsWithoutThisJobId ++ stages.tail)
+        updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)
       }
     }
-    registerJobIdWithStageList(List(stage))
+    updateJobIdStageIdMapsList(List(stage))
   }
 
-  private def jobIdToStageIdsAdd(jobId: Int) {
-    val stageSet = jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]())
-    stageIdToJobIds.foreach { case (stageId, jobSet) =>
-      if (jobSet.contains(jobId)) {
-        stageSet += stageId
-      }
-    }
-  }
-
-  // Removes job and any stages that are not needed by any other job.  Returns the set of ids for stages that
-  // were removed.  The associated tasks for those stages need to be cancelled if we got here via job cancellation.
+  /**
+   * Removes job and any stages that are not needed by any other job.  Returns the set of ids for stages that
+   * were removed.  The associated tasks for those stages need to be cancelled if we got here via job cancellation.
+   */
   private def removeJobAndIndependentStages(jobId: Int): Set[Int] = {
     val registeredStages = jobIdToStageIds(jobId)
     val independentStages = new HashSet[Int]()
@@ -555,7 +549,6 @@ class DAGScheduler(
           idToActiveJob(jobId) = job
           activeJobs += job
           resultStageToJob(finalStage) = job
-          jobIdToStageIdsAdd(jobId)
           listenerBus.post(SparkListenerJobStart(job, jobIdToStageIds(jobId).toArray, properties))
           submitStage(finalStage)
         }
@@ -605,9 +598,11 @@ class DAGScheduler(
         handleTaskCompletion(completion)
 
       case LocalJobCompleted(stage) =>
+        val jobId = stageIdToJobIds(stage.id).head
         stageIdToJobIds -= stage.id    // clean up data structures that were populated for a local job,
         stageIdToStage -= stage.id     // but that won't get cleaned up via the normal paths through
         stageToInfos -= stage          // completion events or stage abort
+        jobIdToStageIds -= jobId
 
       case TaskSetFailed(taskSet, reason) =>
         stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }