You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2015/05/27 18:32:36 UTC

spark git commit: [SPARK-7878] Rename Stage.jobId to firstJobId

Repository: spark
Updated Branches:
  refs/heads/master 4615081d7 -> ff0ddff46


[SPARK-7878] Rename Stage.jobId to firstJobId

The previous name was confusing, because each stage can be associated with
many jobs, and jobId is just the ID of the first job that was associated
with the Stage. This commit also renames some of the method parameters in
DAGScheduler.scala to clarify when the jobId refers to the first job ID
associated with the stage (as opposed to the jobId associated with a job
that's currently being scheduled).

cc markhamstra JoshRosen (hopefully this will help prevent future bugs like SPARK-6880)

Author: Kay Ousterhout <ka...@gmail.com>

Closes #6418 from kayousterhout/SPARK-7878 and squashes the following commits:

b71a9b8 [Kay Ousterhout] [SPARK-7878] Rename Stage.jobId to firstJobId


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff0ddff4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff0ddff4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff0ddff4

Branch: refs/heads/master
Commit: ff0ddff46935ae3d036b7dbc437fff8a6c19d6a4
Parents: 4615081
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Wed May 27 09:32:29 2015 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Wed May 27 09:32:29 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/DAGScheduler.scala   | 58 +++++++++-----------
 .../apache/spark/scheduler/ResultStage.scala    |  4 +-
 .../spark/scheduler/ShuffleMapStage.scala       |  4 +-
 .../org/apache/spark/scheduler/Stage.scala      |  4 +-
 4 files changed, 33 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ff0ddff4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5d81291..a083be2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -208,19 +208,17 @@ class DAGScheduler(
 
   /**
    * Get or create a shuffle map stage for the given shuffle dependency's map side.
-   * The jobId value passed in will be used if the stage doesn't already exist with
-   * a lower jobId (jobId always increases across jobs.)
    */
   private def getShuffleMapStage(
       shuffleDep: ShuffleDependency[_, _, _],
-      jobId: Int): ShuffleMapStage = {
+      firstJobId: Int): ShuffleMapStage = {
     shuffleToMapStage.get(shuffleDep.shuffleId) match {
       case Some(stage) => stage
       case None =>
         // We are going to register ancestor shuffle dependencies
-        registerShuffleDependencies(shuffleDep, jobId)
+        registerShuffleDependencies(shuffleDep, firstJobId)
         // Then register current shuffleDep
-        val stage = newOrUsedShuffleStage(shuffleDep, jobId)
+        val stage = newOrUsedShuffleStage(shuffleDep, firstJobId)
         shuffleToMapStage(shuffleDep.shuffleId) = stage
 
         stage
@@ -230,15 +228,15 @@ class DAGScheduler(
   /**
    * Helper function to eliminate some code re-use when creating new stages.
    */
-  private def getParentStagesAndId(rdd: RDD[_], jobId: Int): (List[Stage], Int) = {
-    val parentStages = getParentStages(rdd, jobId)
+  private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {
+    val parentStages = getParentStages(rdd, firstJobId)
     val id = nextStageId.getAndIncrement()
     (parentStages, id)
   }
 
   /**
    * Create a ShuffleMapStage as part of the (re)-creation of a shuffle map stage in
-   * newOrUsedShuffleStage.  The stage will be associated with the provided jobId.
+   * newOrUsedShuffleStage.  The stage will be associated with the provided firstJobId.
    * Production of shuffle map stages should always use newOrUsedShuffleStage, not
    * newShuffleMapStage directly.
    */
@@ -246,21 +244,19 @@ class DAGScheduler(
       rdd: RDD[_],
       numTasks: Int,
       shuffleDep: ShuffleDependency[_, _, _],
-      jobId: Int,
+      firstJobId: Int,
       callSite: CallSite): ShuffleMapStage = {
-    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
+    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)
     val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
-      jobId, callSite, shuffleDep)
+      firstJobId, callSite, shuffleDep)
 
     stageIdToStage(id) = stage
-    updateJobIdStageIdMaps(jobId, stage)
+    updateJobIdStageIdMaps(firstJobId, stage)
     stage
   }
 
   /**
-   * Create a ResultStage -- either directly for use as a result stage, or as part of the
-   * (re)-creation of a shuffle map stage in newOrUsedShuffleStage.  The stage will be associated
-   * with the provided jobId.
+   * Create a ResultStage associated with the provided jobId.
    */
   private def newResultStage(
       rdd: RDD[_],
@@ -277,16 +273,16 @@ class DAGScheduler(
 
   /**
    * Create a shuffle map Stage for the given RDD.  The stage will also be associated with the
-   * provided jobId.  If a stage for the shuffleId existed previously so that the shuffleId is
+   * provided firstJobId.  If a stage for the shuffleId existed previously so that the shuffleId is
    * present in the MapOutputTracker, then the number and location of available outputs are
    * recovered from the MapOutputTracker
    */
   private def newOrUsedShuffleStage(
       shuffleDep: ShuffleDependency[_, _, _],
-      jobId: Int): ShuffleMapStage = {
+      firstJobId: Int): ShuffleMapStage = {
     val rdd = shuffleDep.rdd
     val numTasks = rdd.partitions.size
-    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, jobId, rdd.creationSite)
+    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)
     if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
       val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
       val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
@@ -304,10 +300,10 @@ class DAGScheduler(
   }
 
   /**
-   * Get or create the list of parent stages for a given RDD. The stages will be assigned the
-   * provided jobId if they haven't already been created with a lower jobId.
+   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
+   * the provided firstJobId.
    */
-  private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
+  private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
     val parents = new HashSet[Stage]
     val visited = new HashSet[RDD[_]]
     // We are manually maintaining a stack here to prevent StackOverflowError
@@ -321,7 +317,7 @@ class DAGScheduler(
         for (dep <- r.dependencies) {
           dep match {
             case shufDep: ShuffleDependency[_, _, _] =>
-              parents += getShuffleMapStage(shufDep, jobId)
+              parents += getShuffleMapStage(shufDep, firstJobId)
             case _ =>
               waitingForVisit.push(dep.rdd)
           }
@@ -336,11 +332,11 @@ class DAGScheduler(
   }
 
   /** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
-  private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) {
+  private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {
     val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
     while (parentsWithNoMapStage.nonEmpty) {
       val currentShufDep = parentsWithNoMapStage.pop()
-      val stage = newOrUsedShuffleStage(currentShufDep, jobId)
+      val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)
       shuffleToMapStage(currentShufDep.shuffleId) = stage
     }
   }
@@ -390,7 +386,7 @@ class DAGScheduler(
           for (dep <- rdd.dependencies) {
             dep match {
               case shufDep: ShuffleDependency[_, _, _] =>
-                val mapStage = getShuffleMapStage(shufDep, stage.jobId)
+                val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
                 if (!mapStage.isAvailable) {
                   missing += mapStage
                 }
@@ -577,7 +573,7 @@ class DAGScheduler(
 
   private[scheduler] def doCancelAllJobs() {
     // Cancel all running jobs.
-    runningStages.map(_.jobId).foreach(handleJobCancellation(_,
+    runningStages.map(_.firstJobId).foreach(handleJobCancellation(_,
       reason = "as part of cancellation of all jobs"))
     activeJobs.clear() // These should already be empty by this point,
     jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
@@ -603,7 +599,7 @@ class DAGScheduler(
       clearCacheLocs()
       val failedStagesCopy = failedStages.toArray
       failedStages.clear()
-      for (stage <- failedStagesCopy.sortBy(_.jobId)) {
+      for (stage <- failedStagesCopy.sortBy(_.firstJobId)) {
         submitStage(stage)
       }
     }
@@ -623,7 +619,7 @@ class DAGScheduler(
     logTrace("failed: " + failedStages)
     val waitingStagesCopy = waitingStages.toArray
     waitingStages.clear()
-    for (stage <- waitingStagesCopy.sortBy(_.jobId)) {
+    for (stage <- waitingStagesCopy.sortBy(_.firstJobId)) {
       submitStage(stage)
     }
   }
@@ -843,7 +839,7 @@ class DAGScheduler(
       }
     }
 
-    val properties = jobIdToActiveJob.get(stage.jobId).map(_.properties).orNull
+    val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull
 
     runningStages += stage
     // SparkListenerStageSubmitted should be posted before testing whether tasks are
@@ -909,7 +905,7 @@ class DAGScheduler(
       stage.pendingTasks ++= tasks
       logDebug("New pending tasks: " + stage.pendingTasks)
       taskScheduler.submitTasks(
-        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
+        new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.firstJobId, properties))
       stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
     } else {
       // Because we posted SparkListenerStageSubmitted earlier, we should mark
@@ -1323,7 +1319,7 @@ class DAGScheduler(
         for (dep <- rdd.dependencies) {
           dep match {
             case shufDep: ShuffleDependency[_, _, _] =>
-              val mapStage = getShuffleMapStage(shufDep, stage.jobId)
+              val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
               if (!mapStage.isAvailable) {
                 waitingForVisit.push(mapStage.rdd)
               }  // Otherwise there's no need to follow the dependency back

http://git-wip-us.apache.org/repos/asf/spark/blob/ff0ddff4/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
index c0f3d5a..bf81b9a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
@@ -28,9 +28,9 @@ private[spark] class ResultStage(
     rdd: RDD[_],
     numTasks: Int,
     parents: List[Stage],
-    jobId: Int,
+    firstJobId: Int,
     callSite: CallSite)
-  extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
+  extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {
 
   // The active job for this result stage. Will be empty if the job has already finished
   // (e.g., because the job was cancelled).

http://git-wip-us.apache.org/repos/asf/spark/blob/ff0ddff4/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
index d022107..66c75f3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
@@ -30,10 +30,10 @@ private[spark] class ShuffleMapStage(
     rdd: RDD[_],
     numTasks: Int,
     parents: List[Stage],
-    jobId: Int,
+    firstJobId: Int,
     callSite: CallSite,
     val shuffleDep: ShuffleDependency[_, _, _])
-  extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
+  extends Stage(id, rdd, numTasks, parents, firstJobId, callSite) {
 
   override def toString: String = "ShuffleMapStage " + id
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ff0ddff4/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 5d0ddb8..c59d6e4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -34,7 +34,7 @@ import org.apache.spark.util.CallSite
  * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
  * that each output partition is on.
  *
- * Each Stage also has a jobId, identifying the job that first submitted the stage.  When FIFO
+ * Each Stage also has a firstJobId, identifying the job that first submitted the stage.  When FIFO
  * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
  * faster on failure.
  *
@@ -51,7 +51,7 @@ private[spark] abstract class Stage(
     val rdd: RDD[_],
     val numTasks: Int,
     val parents: List[Stage],
-    val jobId: Int,
+    val firstJobId: Int,
     val callSite: CallSite)
   extends Logging {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org