You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/06 20:50:09 UTC

[08/12] git commit: Synchronous, inline cleanup after runLocally

Synchronous, inline cleanup after runLocally


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f55d0b93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f55d0b93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f55d0b93

Branch: refs/heads/master
Commit: f55d0b935d7c148f49b15932938e91150b64466f
Parents: c9fcd90
Author: Mark Hamstra <ma...@gmail.com>
Authored: Tue Nov 26 14:06:59 2013 -0800
Committer: Mark Hamstra <ma...@gmail.com>
Committed: Tue Dec 3 09:57:32 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/DAGScheduler.scala    | 15 ++++++---------
 .../apache/spark/scheduler/DAGSchedulerEvent.scala   |  2 --
 .../apache/spark/scheduler/DAGSchedulerSuite.scala   |  2 --
 3 files changed, 6 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f55d0b93/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index b371a24..b849867 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -597,14 +597,6 @@ class DAGScheduler(
         listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
         handleTaskCompletion(completion)
 
-      case LocalJobCompleted(job, result) =>
-        val stage = job.finalStage
-        stageIdToJobIds -= stage.id    // clean up data structures that were populated for a local job,
-        stageIdToStage -= stage.id     // but that won't get cleaned up via the normal paths through
-        stageToInfos -= stage          // completion events or stage abort
-        jobIdToStageIds -= job.jobId
-        listenerBus.post(SparkListenerJobEnd(job, result))
-
       case TaskSetFailed(taskSet, reason) =>
         stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
 
@@ -691,7 +683,12 @@ class DAGScheduler(
         jobResult = JobFailed(e, Some(job.finalStage))
         job.listener.jobFailed(e)
     } finally {
-      eventProcessActor ! LocalJobCompleted(job, jobResult)
+      val s = job.finalStage
+      stageIdToJobIds -= s.id    // clean up data structures that were populated for a local job,
+      stageIdToStage -= s.id     // but that won't get cleaned up via the normal paths through
+      stageToInfos -= s          // completion events or stage abort
+      jobIdToStageIds -= job.jobId
+      listenerBus.post(SparkListenerJobEnd(job, jobResult))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f55d0b93/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index aa496b7..add1187 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -65,8 +65,6 @@ private[scheduler] case class CompletionEvent(
     taskMetrics: TaskMetrics)
   extends DAGSchedulerEvent
 
-private[scheduler] case class LocalJobCompleted(job: ActiveJob, result: JobResult) extends DAGSchedulerEvent
-
 private[scheduler] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent
 
 private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f55d0b93/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 8ce8c68..706d84a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -219,8 +219,6 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
     }
     val jobId = scheduler.nextJobId.getAndIncrement()
     runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, listener))
-    assert(scheduler.stageToInfos.size === 1)
-    runEvent(LocalJobCompleted(scheduler.stageToInfos.keys.head))
     assert(results === Map(0 -> 42))
     assertDataStructuresEmpty
   }