You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/04/01 12:09:04 UTC

spark git commit: [SPARK-4655][Core] Split Stage into ShuffleMapStage and ResultStage subclasses

Repository: spark
Updated Branches:
  refs/heads/master 305abe1e5 -> ff1915e12


[SPARK-4655][Core] Split Stage into ShuffleMapStage and ResultStage subclasses

Hi all - this patch changes the Stage class to an abstract class and introduces two new classes that extend it: ShuffleMapStage and ResultStage - with the goal of increasing readability of the DAGScheduler class. Their usage is updated within DAGScheduler.

Author: Ilya Ganelin <il...@capitalone.com>
Author: Ilya Ganelin <il...@gmail.com>

Closes #4708 from ilganeli/SPARK-4655 and squashes the following commits:

c248924 [Ilya Ganelin] Merge branch 'SPARK-4655' of github.com:ilganeli/spark into SPARK-4655
d930385 [Ilya Ganelin] Fixed merge conflict from
a9a765f [Ilya Ganelin] Update DAGScheduler.scala
c03563c [Ilya Ganelin] Minor fixeS
c39e971 [Ilya Ganelin] Added return typing for public methods
845bc87 [Ilya Ganelin] Merge branch 'SPARK-4655' of github.com:ilganeli/spark into SPARK-4655
e8031d8 [Ilya Ganelin] Minor string fixes
4ec53ac [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-4655
c004f62 [Ilya Ganelin] Update DAGScheduler.scala
a2cb03f [Ilya Ganelin] [SPARK-4655] Replaced usages of Nil and eliminated some code reuse
3d5cf20 [Ilya Ganelin] [SPARK-4655] Moved mima exclude to 1.4
6912c55 [Ilya Ganelin] Resolved merge conflict
4bff208 [Ilya Ganelin] Minor stylistic fixes
c6fffbb [Ilya Ganelin] newline
41402ad [Ilya Ganelin] Style fixes
02c6981 [Ilya Ganelin] Merge branch 'SPARK-4655' of github.com:ilganeli/spark into SPARK-4655
c755a09 [Ilya Ganelin] Some more stylistic updates and minor refactoring
b6257a0 [Ilya Ganelin] Update MimaExcludes.scala
0f0c624 [Ilya Ganelin] Fixed merge conflict
2eba262 [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-4655
6b43d7b [Ilya Ganelin] Got rid of some spaces
6f1a5db [Ilya Ganelin] Revert "More minor formatting and refactoring"
1b3471b [Ilya Ganelin] Merge remote-tracking branch 'upstream/master' into SPARK-4655
c9288e2 [Ilya Ganelin] More minor formatting and refactoring
d548caf [Ilya Ganelin] Formatting fix
c3ae5c2 [Ilya Ganelin] Explicit typing
0dacaf3 [Ilya Ganelin] Got rid of stale import
6da3a71 [Ilya Ganelin] Trailing whitespace
b85c5fe [Ilya Ganelin] Added minor fixes
a57dfcd [Ilya Ganelin] Added MiMA exclusion to get around binary compatibility check
83ed849 [Ilya Ganelin] moved braces for consistency
96dd161 [Ilya Ganelin] Fixed minor style error
cfd6f10 [Ilya Ganelin] Updated DAGScheduler to use new ResultStage and ShuffleMapStage classes
83494e9 [Ilya Ganelin] Added new Stage classes


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ff1915e1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ff1915e1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ff1915e1

Branch: refs/heads/master
Commit: ff1915e12edc4d23e0b4e88933429c2d3470f3d9
Parents: 305abe1
Author: Ilya Ganelin <il...@capitalone.com>
Authored: Wed Apr 1 11:09:00 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Apr 1 11:09:00 2015 +0100

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/ActiveJob.scala  |   2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   | 297 ++++++++++---------
 .../apache/spark/scheduler/ResultStage.scala    |  40 +++
 .../spark/scheduler/ShuffleMapStage.scala       |  84 ++++++
 .../org/apache/spark/scheduler/Stage.scala      |  65 +---
 project/MimaExcludes.scala                      |   6 +-
 6 files changed, 298 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ff1915e1/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
index b755d8f..50a6937 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
@@ -27,7 +27,7 @@ import org.apache.spark.util.CallSite
  */
 private[spark] class ActiveJob(
     val jobId: Int,
-    val finalStage: Stage,
+    val finalStage: ResultStage,
     val func: (TaskContext, Iterator[_]) => _,
     val partitions: Array[Int],
     val callSite: CallSite,

http://git-wip-us.apache.org/repos/asf/spark/blob/ff1915e1/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b405bd3..d35b4f9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -83,7 +83,7 @@ class DAGScheduler(
 
   private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
   private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
-  private[scheduler] val shuffleToMapStage = new HashMap[Int, Stage]
+  private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
   private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
 
   // Stages we need to run whose parents aren't done
@@ -150,7 +150,7 @@ class DAGScheduler(
       result: Any,
       accumUpdates: Map[Long, Any],
       taskInfo: TaskInfo,
-      taskMetrics: TaskMetrics) {
+      taskMetrics: TaskMetrics): Unit = {
     eventProcessLoop.post(
       CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
   }
@@ -173,18 +173,18 @@ class DAGScheduler(
   }
 
   // Called by TaskScheduler when an executor fails.
-  def executorLost(execId: String) {
+  def executorLost(execId: String): Unit = {
     eventProcessLoop.post(ExecutorLost(execId))
   }
 
   // Called by TaskScheduler when a host is added
-  def executorAdded(execId: String, host: String) {
+  def executorAdded(execId: String, host: String): Unit = {
     eventProcessLoop.post(ExecutorAdded(execId, host))
   }
 
   // Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
   // cancellation of the job itself.
-  def taskSetFailed(taskSet: TaskSet, reason: String) {
+  def taskSetFailed(taskSet: TaskSet, reason: String): Unit = {
     eventProcessLoop.post(TaskSetFailed(taskSet, reason))
   }
 
@@ -210,40 +210,65 @@ class DAGScheduler(
    * The jobId value passed in will be used if the stage doesn't already exist with
    * a lower jobId (jobId always increases across jobs.)
    */
-  private def getShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): Stage = {
+  private def getShuffleMapStage(
+      shuffleDep: ShuffleDependency[_, _, _],
+      jobId: Int): ShuffleMapStage = {
     shuffleToMapStage.get(shuffleDep.shuffleId) match {
       case Some(stage) => stage
       case None =>
         // We are going to register ancestor shuffle dependencies
         registerShuffleDependencies(shuffleDep, jobId)
         // Then register current shuffleDep
-        val stage =
-          newOrUsedStage(
-            shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
-            shuffleDep.rdd.creationSite)
+        val stage = newOrUsedShuffleStage(shuffleDep, jobId)
         shuffleToMapStage(shuffleDep.shuffleId) = stage
- 
+
         stage
     }
   }
 
   /**
-   * Create a Stage -- either directly for use as a result stage, or as part of the (re)-creation
-   * of a shuffle map stage in newOrUsedStage.  The stage will be associated with the provided
-   * jobId. Production of shuffle map stages should always use newOrUsedStage, not newStage
-   * directly.
+   * Helper function to eliminate some code re-use when creating new stages.
    */
-  private def newStage(
+  private def getParentStagesAndId(rdd: RDD[_], jobId: Int): (List[Stage], Int) = {
+    val parentStages = getParentStages(rdd, jobId)
+    val id = nextStageId.getAndIncrement()
+    (parentStages, id)
+  }
+
+  /**
+   * Create a ShuffleMapStage as part of the (re)-creation of a shuffle map stage in
+   * newOrUsedShuffleStage.  The stage will be associated with the provided jobId.
+   * Production of shuffle map stages should always use newOrUsedShuffleStage, not
+   * newShuffleMapStage directly.
+   */
+  private def newShuffleMapStage(
       rdd: RDD[_],
       numTasks: Int,
-      shuffleDep: Option[ShuffleDependency[_, _, _]],
+      shuffleDep: ShuffleDependency[_, _, _],
       jobId: Int,
-      callSite: CallSite)
-    : Stage =
-  {
-    val parentStages = getParentStages(rdd, jobId)
-    val id = nextStageId.getAndIncrement()
-    val stage = new Stage(id, rdd, numTasks, shuffleDep, parentStages, jobId, callSite)
+      callSite: CallSite): ShuffleMapStage = {
+    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
+    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,
+      jobId, callSite, shuffleDep)
+
+    stageIdToStage(id) = stage
+    updateJobIdStageIdMaps(jobId, stage)
+    stage
+  }
+
+  /**
+   * Create a ResultStage -- either directly for use as a result stage, or as part of the
+   * (re)-creation of a shuffle map stage in newOrUsedShuffleStage.  The stage will be associated
+   * with the provided jobId.
+   */
+  private def newResultStage(
+      rdd: RDD[_],
+      numTasks: Int,
+      jobId: Int,
+      callSite: CallSite): ResultStage = {
+    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
+    val stage: ResultStage = new ResultStage(id, rdd, numTasks, parentStages, jobId, callSite)
+
     stageIdToStage(id) = stage
     updateJobIdStageIdMaps(jobId, stage)
     stage
@@ -255,20 +280,17 @@ class DAGScheduler(
    * present in the MapOutputTracker, then the number and location of available outputs are
    * recovered from the MapOutputTracker
    */
-  private def newOrUsedStage(
-      rdd: RDD[_],
-      numTasks: Int,
+  private def newOrUsedShuffleStage(
       shuffleDep: ShuffleDependency[_, _, _],
-      jobId: Int,
-      callSite: CallSite)
-    : Stage =
-  {
-    val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
+      jobId: Int): ShuffleMapStage = {
+    val rdd = shuffleDep.rdd
+    val numTasks = rdd.partitions.size
+    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, jobId, rdd.creationSite)
     if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
       val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
       val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
       for (i <- 0 until locs.size) {
-        stage.outputLocs(i) = Option(locs(i)).toList   // locs(i) will be null if missing
+        stage.outputLocs(i) = Option(locs(i)).toList // locs(i) will be null if missing
       }
       stage.numAvailableOutputs = locs.count(_ != null)
     } else {
@@ -306,26 +328,23 @@ class DAGScheduler(
       }
     }
     waitingForVisit.push(rdd)
-    while (!waitingForVisit.isEmpty) {
+    while (waitingForVisit.nonEmpty) {
       visit(waitingForVisit.pop())
     }
     parents.toList
   }
 
-  // Find ancestor missing shuffle dependencies and register into shuffleToMapStage
-  private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) = {
+  /** Find ancestor missing shuffle dependencies and register into shuffleToMapStage */
+  private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], jobId: Int) {
     val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)
-    while (!parentsWithNoMapStage.isEmpty) {
+    while (parentsWithNoMapStage.nonEmpty) {
       val currentShufDep = parentsWithNoMapStage.pop()
-      val stage =
-        newOrUsedStage(
-          currentShufDep.rdd, currentShufDep.rdd.partitions.size, currentShufDep, jobId,
-          currentShufDep.rdd.creationSite)
+      val stage = newOrUsedShuffleStage(currentShufDep, jobId)
       shuffleToMapStage(currentShufDep.shuffleId) = stage
     }
   }
 
-  // Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet
+  /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
   private def getAncestorShuffleDependencies(rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
     val parents = new Stack[ShuffleDependency[_, _, _]]
     val visited = new HashSet[RDD[_]]
@@ -351,7 +370,7 @@ class DAGScheduler(
     }
 
     waitingForVisit.push(rdd)
-    while (!waitingForVisit.isEmpty) {
+    while (waitingForVisit.nonEmpty) {
       visit(waitingForVisit.pop())
     }
     parents
@@ -382,7 +401,7 @@ class DAGScheduler(
       }
     }
     waitingForVisit.push(stage.rdd)
-    while (!waitingForVisit.isEmpty) {
+    while (waitingForVisit.nonEmpty) {
       visit(waitingForVisit.pop())
     }
     missing.toList
@@ -392,7 +411,7 @@ class DAGScheduler(
    * Registers the given jobId among the jobs that need the given stage and
    * all of that stage's ancestors.
    */
-  private def updateJobIdStageIdMaps(jobId: Int, stage: Stage) {
+  private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {
     def updateJobIdStageIdMapsList(stages: List[Stage]) {
       if (stages.nonEmpty) {
         val s = stages.head
@@ -412,7 +431,7 @@ class DAGScheduler(
    *
    * @param job The job whose state to cleanup.
    */
-  private def cleanupStateForJobAndIndependentStages(job: ActiveJob) {
+  private def cleanupStateForJobAndIndependentStages(job: ActiveJob): Unit = {
     val registeredStages = jobIdToStageIds.get(job.jobId)
     if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
       logError("No stages registered for job " + job.jobId)
@@ -474,8 +493,7 @@ class DAGScheduler(
       callSite: CallSite,
       allowLocal: Boolean,
       resultHandler: (Int, U) => Unit,
-      properties: Properties = null): JobWaiter[U] =
-  {
+      properties: Properties = null): JobWaiter[U] = {
     // Check to make sure we are not launching a task on a partition that does not exist.
     val maxPartitions = rdd.partitions.length
     partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
@@ -504,15 +522,13 @@ class DAGScheduler(
       callSite: CallSite,
       allowLocal: Boolean,
       resultHandler: (Int, U) => Unit,
-      properties: Properties = null)
-  {
+      properties: Properties = null): Unit = {
     val start = System.nanoTime
     val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
     waiter.awaitResult() match {
-      case JobSucceeded => {
+      case JobSucceeded =>
         logInfo("Job %d finished: %s, took %f s".format
           (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
-      }
       case JobFailed(exception: Exception) =>
         logInfo("Job %d failed: %s, took %f s".format
           (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
@@ -526,9 +542,7 @@ class DAGScheduler(
       evaluator: ApproximateEvaluator[U, R],
       callSite: CallSite,
       timeout: Long,
-      properties: Properties = null)
-    : PartialResult[R] =
-  {
+      properties: Properties = null): PartialResult[R] = {
     val listener = new ApproximateActionListener(rdd, func, evaluator, timeout)
     val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
     val partitions = (0 until rdd.partitions.size).toArray
@@ -541,12 +555,12 @@ class DAGScheduler(
   /**
    * Cancel a job that is running or waiting in the queue.
    */
-  def cancelJob(jobId: Int) {
+  def cancelJob(jobId: Int): Unit = {
     logInfo("Asked to cancel job " + jobId)
     eventProcessLoop.post(JobCancelled(jobId))
   }
 
-  def cancelJobGroup(groupId: String) {
+  def cancelJobGroup(groupId: String): Unit = {
     logInfo("Asked to cancel job group " + groupId)
     eventProcessLoop.post(JobGroupCancelled(groupId))
   }
@@ -554,7 +568,7 @@ class DAGScheduler(
   /**
    * Cancel all jobs that are running or waiting in the queue.
    */
-  def cancelAllJobs() {
+  def cancelAllJobs(): Unit = {
     eventProcessLoop.post(AllJobsCancelled)
   }
 
@@ -722,13 +736,12 @@ class DAGScheduler(
       allowLocal: Boolean,
       callSite: CallSite,
       listener: JobListener,
-      properties: Properties = null)
-  {
-    var finalStage: Stage = null
+      properties: Properties = null) {
+    var finalStage: ResultStage = null
     try {
       // New stage creation may throw an exception if, for example, jobs are run on a
       // HadoopRDD whose underlying HDFS files have been deleted.
-      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
+      finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)
     } catch {
       case e: Exception =>
         logWarning("Creating new stage failed due to exception - job: " + jobId, e)
@@ -773,7 +786,7 @@ class DAGScheduler(
       if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
         val missing = getMissingParentStages(stage).sortBy(_.id)
         logDebug("missing: " + missing)
-        if (missing == Nil) {
+        if (missing.isEmpty) {
           logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
           submitMissingTasks(stage, jobId.get)
         } else {
@@ -794,13 +807,15 @@ class DAGScheduler(
     // Get our pending tasks and remember them in our pendingTasks entry
     stage.pendingTasks.clear()
 
+
     // First figure out the indexes of partition ids to compute.
     val partitionsToCompute: Seq[Int] = {
-      if (stage.isShuffleMap) {
-        (0 until stage.numPartitions).filter(id => stage.outputLocs(id) == Nil)
-      } else {
-        val job = stage.resultOfJob.get
-        (0 until job.numPartitions).filter(id => !job.finished(id))
+      stage match {
+        case stage: ShuffleMapStage =>
+          (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty)
+        case stage: ResultStage =>
+          val job = stage.resultOfJob.get
+          (0 until job.numPartitions).filter(id => !job.finished(id))
       }
     }
 
@@ -830,18 +845,21 @@ class DAGScheduler(
     try {
       // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
       // For ResultTask, serialize and broadcast (rdd, func).
-      val taskBinaryBytes: Array[Byte] =
-        if (stage.isShuffleMap) {
-          closureSerializer.serialize((stage.rdd, stage.shuffleDep.get) : AnyRef).array()
-        } else {
-          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func) : AnyRef).array()
-        }
+      val taskBinaryBytes: Array[Byte] = stage match {
+        case stage: ShuffleMapStage =>
+          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
+        case stage: ResultStage =>
+          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
+      }
+
       taskBinary = sc.broadcast(taskBinaryBytes)
     } catch {
       // In the case of a failure during serialization, abort the stage.
       case e: NotSerializableException =>
         abortStage(stage, "Task not serializable: " + e.toString)
         runningStages -= stage
+
+        // Abort execution
         return
       case NonFatal(e) =>
         abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")
@@ -849,20 +867,22 @@ class DAGScheduler(
         return
     }
 
-    val tasks: Seq[Task[_]] = if (stage.isShuffleMap) {
-      partitionsToCompute.map { id =>
-        val locs = getPreferredLocs(stage.rdd, id)
-        val part = stage.rdd.partitions(id)
-        new ShuffleMapTask(stage.id, taskBinary, part, locs)
-      }
-    } else {
-      val job = stage.resultOfJob.get
-      partitionsToCompute.map { id =>
-        val p: Int = job.partitions(id)
-        val part = stage.rdd.partitions(p)
-        val locs = getPreferredLocs(stage.rdd, p)
-        new ResultTask(stage.id, taskBinary, part, locs, id)
-      }
+    val tasks: Seq[Task[_]] = stage match {
+      case stage: ShuffleMapStage =>
+        partitionsToCompute.map { id =>
+          val locs = getPreferredLocs(stage.rdd, id)
+          val part = stage.rdd.partitions(id)
+          new ShuffleMapTask(stage.id, taskBinary, part, locs)
+        }
+
+      case stage: ResultStage =>
+        val job = stage.resultOfJob.get
+        partitionsToCompute.map { id =>
+          val p: Int = job.partitions(id)
+          val part = stage.rdd.partitions(p)
+          val locs = getPreferredLocs(stage.rdd, p)
+          new ResultTask(stage.id, taskBinary, part, locs, id)
+        }
     }
 
     if (tasks.size > 0) {
@@ -877,8 +897,17 @@ class DAGScheduler(
       // SparkListenerStageCompleted here in case there are no tasks to run.
       outputCommitCoordinator.stageEnd(stage.id)
       listenerBus.post(SparkListenerStageCompleted(stage.latestInfo))
-      logDebug("Stage " + stage + " is actually done; %b %d %d".format(
-        stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
+
+      val debugString = stage match {
+        case stage: ShuffleMapStage =>
+          s"Stage ${stage} is actually done; " +
+            s"(available: ${stage.isAvailable}," +
+            s"available outputs: ${stage.numAvailableOutputs}," +
+            s"partitions: ${stage.numPartitions})"
+        case stage : ResultStage =>
+          s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
+      }
+      logDebug(debugString)
       runningStages -= stage
     }
   }
@@ -968,7 +997,10 @@ class DAGScheduler(
         stage.pendingTasks -= task
         task match {
           case rt: ResultTask[_, _] =>
-            stage.resultOfJob match {
+            // Cast to ResultStage here because it's part of the ResultTask
+            // TODO Refactor this out to a function that accepts a ResultStage
+            val resultStage = stage.asInstanceOf[ResultStage]
+            resultStage.resultOfJob match {
               case Some(job) =>
                 if (!job.finished(rt.outputId)) {
                   updateAccumulators(event)
@@ -976,7 +1008,7 @@ class DAGScheduler(
                   job.numFinished += 1
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
-                    markStageAsFinished(stage)
+                    markStageAsFinished(resultStage)
                     cleanupStateForJobAndIndependentStages(job)
                     listenerBus.post(
                       SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
@@ -988,7 +1020,7 @@ class DAGScheduler(
                     job.listener.taskSucceeded(rt.outputId, event.result)
                   } catch {
                     case e: Exception =>
-                      // TODO: Perhaps we want to mark the stage as failed?
+                      // TODO: Perhaps we want to mark the resultStage as failed?
                       job.listener.jobFailed(new SparkDriverExecutionException(e))
                   }
                 }
@@ -997,6 +1029,7 @@ class DAGScheduler(
             }
 
           case smt: ShuffleMapTask =>
+            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
             updateAccumulators(event)
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
@@ -1004,50 +1037,54 @@ class DAGScheduler(
             if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
               logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
             } else {
-              stage.addOutputLoc(smt.partitionId, status)
+              shuffleStage.addOutputLoc(smt.partitionId, status)
             }
-            if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) {
-              markStageAsFinished(stage)
+            if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
+              markStageAsFinished(shuffleStage)
               logInfo("looking for newly runnable stages")
               logInfo("running: " + runningStages)
               logInfo("waiting: " + waitingStages)
               logInfo("failed: " + failedStages)
-              if (stage.shuffleDep.isDefined) {
-                // We supply true to increment the epoch number here in case this is a
-                // recomputation of the map outputs. In that case, some nodes may have cached
-                // locations with holes (from when we detected the error) and will need the
-                // epoch incremented to refetch them.
-                // TODO: Only increment the epoch number if this is not the first time
-                //       we registered these map outputs.
-                mapOutputTracker.registerMapOutputs(
-                  stage.shuffleDep.get.shuffleId,
-                  stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
-                  changeEpoch = true)
-              }
+
+              // We supply true to increment the epoch number here in case this is a
+              // recomputation of the map outputs. In that case, some nodes may have cached
+              // locations with holes (from when we detected the error) and will need the
+              // epoch incremented to refetch them.
+              // TODO: Only increment the epoch number if this is not the first time
+              //       we registered these map outputs.
+              mapOutputTracker.registerMapOutputs(
+                shuffleStage.shuffleDep.shuffleId,
+                shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,
+                changeEpoch = true)
+
               clearCacheLocs()
-              if (stage.outputLocs.exists(_ == Nil)) {
-                // Some tasks had failed; let's resubmit this stage
+              if (shuffleStage.outputLocs.contains(Nil)) {
+                // Some tasks had failed; let's resubmit this shuffleStage
                 // TODO: Lower-level scheduler should also deal with this
-                logInfo("Resubmitting " + stage + " (" + stage.name +
+                logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
                   ") because some of its tasks had failed: " +
-                  stage.outputLocs.zipWithIndex.filter(_._1 == Nil).map(_._2).mkString(", "))
-                submitStage(stage)
+                  shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)
+                      .map(_._2).mkString(", "))
+                submitStage(shuffleStage)
               } else {
                 val newlyRunnable = new ArrayBuffer[Stage]
-                for (stage <- waitingStages) {
-                  logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
+                for (shuffleStage <- waitingStages) {
+                  logInfo("Missing parents for " + shuffleStage + ": " +
+                    getMissingParentStages(shuffleStage))
                 }
-                for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
-                  newlyRunnable += stage
+                for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty)
+                {
+                  newlyRunnable += shuffleStage
                 }
                 waitingStages --= newlyRunnable
                 runningStages ++= newlyRunnable
                 for {
-                  stage <- newlyRunnable.sortBy(_.id)
-                  jobId <- activeJobForStage(stage)
+                  shuffleStage <- newlyRunnable.sortBy(_.id)
+                  jobId <- activeJobForStage(shuffleStage)
                 } {
-                  logInfo("Submitting " + stage + " (" + stage.rdd + "), which is now runnable")
-                  submitMissingTasks(stage, jobId)
+                  logInfo("Submitting " + shuffleStage + " (" +
+                    shuffleStage.rdd + "), which is now runnable")
+                  submitMissingTasks(shuffleStage, jobId)
                 }
               }
             }
@@ -1204,9 +1241,7 @@ class DAGScheduler(
     }
   }
 
-  /**
-   * Fails a job and all stages that are only used by that job, and cleans up relevant state.
-   */
+  /** Fails a job and all stages that are only used by that job, and cleans up relevant state. */
   private def failJobAndIndependentStages(job: ActiveJob, failureReason: String) {
     val error = new SparkException(failureReason)
     var ableToCancelStages = true
@@ -1254,9 +1289,7 @@ class DAGScheduler(
     }
   }
 
-  /**
-   * Return true if one of stage's ancestors is target.
-   */
+  /** Return true if one of stage's ancestors is target. */
   private def stageDependsOn(stage: Stage, target: Stage): Boolean = {
     if (stage == target) {
       return true
@@ -1282,7 +1315,7 @@ class DAGScheduler(
       }
     }
     waitingForVisit.push(stage.rdd)
-    while (!waitingForVisit.isEmpty) {
+    while (waitingForVisit.nonEmpty) {
       visit(waitingForVisit.pop())
     }
     visitedRdds.contains(target.rdd)
@@ -1312,9 +1345,7 @@ class DAGScheduler(
   private def getPreferredLocsInternal(
       rdd: RDD[_],
       partition: Int,
-      visited: HashSet[(RDD[_],Int)])
-    : Seq[TaskLocation] =
-  {
+      visited: HashSet[(RDD[_],Int)]): Seq[TaskLocation] = {
     // If the partition has already been visited, no need to re-visit.
     // This avoids exponential path exploration.  SPARK-695
     if (!visited.add((rdd,partition))) {
@@ -1323,12 +1354,12 @@ class DAGScheduler(
     }
     // If the partition is cached, return the cache locations
     val cached = getCacheLocs(rdd)(partition)
-    if (!cached.isEmpty) {
+    if (cached.nonEmpty) {
       return cached
     }
     // If the RDD has some placement preferences (as is the case for input RDDs), get those
     val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
-    if (!rddPrefs.isEmpty) {
+    if (rddPrefs.nonEmpty) {
       return rddPrefs.map(TaskLocation(_))
     }
     // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
@@ -1412,7 +1443,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler
     dagScheduler.sc.stop()
   }
 
-  override def onStop() {
+  override def onStop(): Unit = {
     // Cancel any active jobs in postStop hook
     dagScheduler.cleanUpAfterSchedulerStop()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ff1915e1/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
new file mode 100644
index 0000000..c0f3d5a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.CallSite
+
+/**
+ * The ResultStage represents the final stage in a job.
+ */
+private[spark] class ResultStage(
+    id: Int,
+    rdd: RDD[_],
+    numTasks: Int,
+    parents: List[Stage],
+    jobId: Int,
+    callSite: CallSite)
+  extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
+
+  // The active job for this result stage. Will be empty if the job has already finished
+  // (e.g., because the job was cancelled).
+  var resultOfJob: Option[ActiveJob] = None
+
+  override def toString: String = "ResultStage " + id
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ff1915e1/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
new file mode 100644
index 0000000..d022107
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.ShuffleDependency
+import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.CallSite
+
+/**
+ * The ShuffleMapStage represents the intermediate stages in a job.
+ */
+private[spark] class ShuffleMapStage(
+    id: Int,
+    rdd: RDD[_],
+    numTasks: Int,
+    parents: List[Stage],
+    jobId: Int,
+    callSite: CallSite,
+    val shuffleDep: ShuffleDependency[_, _, _])
+  extends Stage(id, rdd, numTasks, parents, jobId, callSite) {
+
+  override def toString: String = "ShuffleMapStage " + id
+
+  var numAvailableOutputs: Long = 0
+
+  def isAvailable: Boolean = numAvailableOutputs == numPartitions
+
+  val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
+
+  def addOutputLoc(partition: Int, status: MapStatus): Unit = {
+    val prevList = outputLocs(partition)
+    outputLocs(partition) = status :: prevList
+    if (prevList == Nil) {
+      numAvailableOutputs += 1
+    }
+  }
+
+  def removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit = {
+    val prevList = outputLocs(partition)
+    val newList = prevList.filterNot(_.location == bmAddress)
+    outputLocs(partition) = newList
+    if (prevList != Nil && newList == Nil) {
+      numAvailableOutputs -= 1
+    }
+  }
+
+  /**
+   * Removes all shuffle outputs associated with this executor. Note that this will also remove
+   * outputs which are served by an external shuffle server (if one exists), as they are still
+   * registered with this execId.
+   */
+  def removeOutputsOnExecutor(execId: String): Unit = {
+    var becameUnavailable = false
+    for (partition <- 0 until numPartitions) {
+      val prevList = outputLocs(partition)
+      val newList = prevList.filterNot(_.location.executorId == execId)
+      outputLocs(partition) = newList
+      if (prevList != Nil && newList == Nil) {
+        becameUnavailable = true
+        numAvailableOutputs -= 1
+      }
+    }
+    if (becameUnavailable) {
+      logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
+        this, execId, numAvailableOutputs, numPartitions, isAvailable))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ff1915e1/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 4cbc6e8..5d0ddb8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -21,7 +21,6 @@ import scala.collection.mutable.HashSet
 
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.CallSite
 
 /**
@@ -47,29 +46,23 @@ import org.apache.spark.util.CallSite
  * be updated for each attempt.
  *
  */
-private[spark] class Stage(
+private[spark] abstract class Stage(
     val id: Int,
     val rdd: RDD[_],
     val numTasks: Int,
-    val shuffleDep: Option[ShuffleDependency[_, _, _]],  // Output shuffle if stage is a map stage
     val parents: List[Stage],
     val jobId: Int,
     val callSite: CallSite)
   extends Logging {
 
-  val isShuffleMap = shuffleDep.isDefined
   val numPartitions = rdd.partitions.size
-  val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
-  var numAvailableOutputs = 0
 
   /** Set of jobs that this stage belongs to. */
   val jobIds = new HashSet[Int]
 
-  /** For stages that are the final (consists of only ResultTasks), link to the ActiveJob. */
-  var resultOfJob: Option[ActiveJob] = None
   var pendingTasks = new HashSet[Task[_]]
 
-  private var nextAttemptId = 0
+  private var nextAttemptId: Int = 0
 
   val name = callSite.shortForm
   val details = callSite.longForm
@@ -77,53 +70,6 @@ private[spark] class Stage(
   /** Pointer to the latest [StageInfo] object, set by DAGScheduler. */
   var latestInfo: StageInfo = StageInfo.fromStage(this)
 
-  def isAvailable: Boolean = {
-    if (!isShuffleMap) {
-      true
-    } else {
-      numAvailableOutputs == numPartitions
-    }
-  }
-
-  def addOutputLoc(partition: Int, status: MapStatus) {
-    val prevList = outputLocs(partition)
-    outputLocs(partition) = status :: prevList
-    if (prevList == Nil) {
-      numAvailableOutputs += 1
-    }
-  }
-
-  def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
-    val prevList = outputLocs(partition)
-    val newList = prevList.filterNot(_.location == bmAddress)
-    outputLocs(partition) = newList
-    if (prevList != Nil && newList == Nil) {
-      numAvailableOutputs -= 1
-    }
-  }
-
-  /**
-   * Removes all shuffle outputs associated with this executor. Note that this will also remove
-   * outputs which are served by an external shuffle server (if one exists), as they are still
-   * registered with this execId.
-   */
-  def removeOutputsOnExecutor(execId: String) {
-    var becameUnavailable = false
-    for (partition <- 0 until numPartitions) {
-      val prevList = outputLocs(partition)
-      val newList = prevList.filterNot(_.location.executorId == execId)
-      outputLocs(partition) = newList
-      if (prevList != Nil && newList == Nil) {
-        becameUnavailable = true
-        numAvailableOutputs -= 1
-      }
-    }
-    if (becameUnavailable) {
-      logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
-        this, execId, numAvailableOutputs, numPartitions, isAvailable))
-    }
-  }
-
   /** Return a new attempt id, starting with 0. */
   def newAttemptId(): Int = {
     val id = nextAttemptId
@@ -133,11 +79,8 @@ private[spark] class Stage(
 
   def attemptId: Int = nextAttemptId
 
-  override def toString: String = "Stage " + id
-
-  override def hashCode(): Int = id
-
-  override def equals(other: Any): Boolean = other match {
+  override final def hashCode(): Int = id
+  override final def equals(other: Any): Boolean = other match {
     case stage: Stage => stage != null && stage.id == id
     case _ => false
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ff1915e1/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index efd59a7..54500f7 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -54,7 +54,11 @@ object MimaExcludes {
             ProblemFilters.exclude[MissingClassProblem](
               "org.apache.spark.scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor")
           ) ++ Seq(
-          // SPARK-6510 Add a Graph#minus method acting as Set#difference
+            // SPARK-4655 - Making Stage an Abstract class broke binary compatility even though
+            // the stage class is defined as private[spark]
+            ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.scheduler.Stage")
+          ) ++ Seq(
+            // SPARK-6510 Add a Graph#minus method acting as Set#difference
             ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus")
           )
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org