You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/04/02 19:36:11 UTC
git commit: Renamed stageIdToActiveJob to jobIdToActiveJob.
Repository: spark
Updated Branches:
refs/heads/master ea9de658a -> 11973a7bd
Renamed stageIdToActiveJob to jobIdToActiveJob.
This data structure was misused and, as a result, later renamed to an incorrect name.
This data structure seems to have gotten into this tangled state as a result of @henrydavidge using the stageID instead of the job Id to index into it and later @andrewor14 renaming the data structure to reflect this misunderstanding.
This patch renames it and removes an incorrect indexing into it. The incorrect indexing into it meant that the code added by @henrydavidge to warn when a task size is too large (added here https://github.com/apache/spark/commit/57579934f0454f258615c10e69ac2adafc5b9835) was not always executed; this commit fixes that.
Author: Kay Ousterhout <ka...@gmail.com>
Closes #301 from kayousterhout/fixCancellation and squashes the following commits:
bd3d3a4 [Kay Ousterhout] Renamed stageIdToActiveJob to jobIdToActiveJob.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/11973a7b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/11973a7b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/11973a7b
Branch: refs/heads/master
Commit: 11973a7bdad58fdb759033c232d87f0b279c83b4
Parents: ea9de65
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Wed Apr 2 10:35:52 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Apr 2 10:35:52 2014 -0700
----------------------------------------------------------------------
.../apache/spark/scheduler/DAGScheduler.scala | 21 ++++++++++----------
.../spark/scheduler/DAGSchedulerSuite.scala | 2 +-
2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/11973a7b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 4fce47e..ef3d24d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -84,7 +84,7 @@ class DAGScheduler(
private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
- private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
+ private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
@@ -536,7 +536,7 @@ class DAGScheduler(
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
} else {
- stageIdToActiveJob(jobId) = job
+ jobIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(
@@ -559,7 +559,7 @@ class DAGScheduler(
// Cancel all running jobs.
runningStages.map(_.jobId).foreach(handleJobCancellation)
activeJobs.clear() // These should already be empty by this point,
- stageIdToActiveJob.clear() // but just in case we lost track of some jobs...
+ jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
case ExecutorAdded(execId, host) =>
handleExecutorAdded(execId, host)
@@ -569,7 +569,6 @@ class DAGScheduler(
case BeginEvent(task, taskInfo) =>
for (
- job <- stageIdToActiveJob.get(task.stageId);
stage <- stageIdToStage.get(task.stageId);
stageInfo <- stageToInfos.get(stage)
) {
@@ -697,7 +696,7 @@ class DAGScheduler(
private def activeJobForStage(stage: Stage): Option[Int] = {
if (stageIdToJobIds.contains(stage.id)) {
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted
- jobsThatUseStage.find(stageIdToActiveJob.contains)
+ jobsThatUseStage.find(jobIdToActiveJob.contains)
} else {
None
}
@@ -750,8 +749,8 @@ class DAGScheduler(
}
}
- val properties = if (stageIdToActiveJob.contains(jobId)) {
- stageIdToActiveJob(stage.jobId).properties
+ val properties = if (jobIdToActiveJob.contains(jobId)) {
+ jobIdToActiveJob(stage.jobId).properties
} else {
// this stage will be assigned to "default" pool
null
@@ -827,7 +826,7 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
- stageIdToActiveJob -= stage.jobId
+ jobIdToActiveJob -= stage.jobId
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
@@ -986,11 +985,11 @@ class DAGScheduler(
val independentStages = removeJobAndIndependentStages(jobId)
independentStages.foreach(taskScheduler.cancelTasks)
val error = new SparkException("Job %d cancelled".format(jobId))
- val job = stageIdToActiveJob(jobId)
+ val job = jobIdToActiveJob(jobId)
job.listener.jobFailed(error)
jobIdToStageIds -= jobId
activeJobs -= job
- stageIdToActiveJob -= jobId
+ jobIdToActiveJob -= jobId
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
}
}
@@ -1011,7 +1010,7 @@ class DAGScheduler(
val error = new SparkException("Job aborted: " + reason)
job.listener.jobFailed(error)
jobIdToStageIdsRemove(job.jobId)
- stageIdToActiveJob -= resultStage.jobId
+ jobIdToActiveJob -= resultStage.jobId
activeJobs -= job
resultStageToJob -= resultStage
listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id)))
http://git-wip-us.apache.org/repos/asf/spark/blob/11973a7b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index c97543f..ce567b0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -428,7 +428,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
assert(scheduler.pendingTasks.isEmpty)
assert(scheduler.activeJobs.isEmpty)
assert(scheduler.failedStages.isEmpty)
- assert(scheduler.stageIdToActiveJob.isEmpty)
+ assert(scheduler.jobIdToActiveJob.isEmpty)
assert(scheduler.jobIdToStageIds.isEmpty)
assert(scheduler.stageIdToJobIds.isEmpty)
assert(scheduler.stageIdToStage.isEmpty)