You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/06 20:50:09 UTC
[08/12] git commit: Synchronous, inline cleanup after runLocally
Synchronous, inline cleanup after runLocally
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f55d0b93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f55d0b93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f55d0b93
Branch: refs/heads/master
Commit: f55d0b935d7c148f49b15932938e91150b64466f
Parents: c9fcd90
Author: Mark Hamstra <ma...@gmail.com>
Authored: Tue Nov 26 14:06:59 2013 -0800
Committer: Mark Hamstra <ma...@gmail.com>
Committed: Tue Dec 3 09:57:32 2013 -0800
----------------------------------------------------------------------
.../org/apache/spark/scheduler/DAGScheduler.scala | 15 ++++++---------
.../apache/spark/scheduler/DAGSchedulerEvent.scala | 2 --
.../apache/spark/scheduler/DAGSchedulerSuite.scala | 2 --
3 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f55d0b93/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b371a24..b849867 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -597,14 +597,6 @@ class DAGScheduler(
listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
handleTaskCompletion(completion)
- case LocalJobCompleted(job, result) =>
- val stage = job.finalStage
- stageIdToJobIds -= stage.id // clean up data structures that were populated for a local job,
- stageIdToStage -= stage.id // but that won't get cleaned up via the normal paths through
- stageToInfos -= stage // completion events or stage abort
- jobIdToStageIds -= job.jobId
- listenerBus.post(SparkListenerJobEnd(job, result))
-
case TaskSetFailed(taskSet, reason) =>
stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
@@ -691,7 +683,12 @@ class DAGScheduler(
jobResult = JobFailed(e, Some(job.finalStage))
job.listener.jobFailed(e)
} finally {
- eventProcessActor ! LocalJobCompleted(job, jobResult)
+ val s = job.finalStage
+ stageIdToJobIds -= s.id // clean up data structures that were populated for a local job,
+ stageIdToStage -= s.id // but that won't get cleaned up via the normal paths through
+ stageToInfos -= s // completion events or stage abort
+ jobIdToStageIds -= job.jobId
+ listenerBus.post(SparkListenerJobEnd(job, jobResult))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f55d0b93/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index aa496b7..add1187 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -65,8 +65,6 @@ private[scheduler] case class CompletionEvent(
taskMetrics: TaskMetrics)
extends DAGSchedulerEvent
-private[scheduler] case class LocalJobCompleted(job: ActiveJob, result: JobResult) extends DAGSchedulerEvent
-
private[scheduler] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent
private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f55d0b93/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 8ce8c68..706d84a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -219,8 +219,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
}
val jobId = scheduler.nextJobId.getAndIncrement()
runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, listener))
- assert(scheduler.stageToInfos.size === 1)
- runEvent(LocalJobCompleted(scheduler.stageToInfos.keys.head))
assert(results === Map(0 -> 42))
assertDataStructuresEmpty
}