You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/02/24 20:02:51 UTC

spark git commit: [Spark-5967] [UI] Correctly clean JobProgressListener.stageIdToActiveJobIds

Repository: spark
Updated Branches:
  refs/heads/master 201236628 -> 64d2c01ff


[Spark-5967] [UI] Correctly clean JobProgressListener.stageIdToActiveJobIds

Patch should be self-explanatory
pwendell JoshRosen

Author: Tathagata Das <ta...@gmail.com>

Closes #4741 from tdas/SPARK-5967 and squashes the following commits:

653b5bb [Tathagata Das] Fixed the fix and added test
e2de972 [Tathagata Das] Clear stages which have no corresponding active jobs.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64d2c01f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64d2c01f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64d2c01f

Branch: refs/heads/master
Commit: 64d2c01ff1048de83b9b8efce987b55e457298f9
Parents: 2012366
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Feb 24 11:02:47 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Feb 24 11:02:47 2015 -0800

----------------------------------------------------------------------
 .../spark/ui/jobs/JobProgressListener.scala     |  3 +++
 .../ui/jobs/JobProgressListenerSuite.scala      | 22 ++++++++++++++++++++
 2 files changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/64d2c01f/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 0b6fe70..937d95a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -203,6 +203,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
     for (stageId <- jobData.stageIds) {
       stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
         jobsUsingStage.remove(jobEnd.jobId)
+        if (jobsUsingStage.isEmpty) {
+          stageIdToActiveJobIds.remove(stageId)
+        }
         stageIdToInfo.get(stageId).foreach { stageInfo =>
           if (stageInfo.submissionTime.isEmpty) {
             // if this stage is pending, it won't complete, so mark it as "skipped":

http://git-wip-us.apache.org/repos/asf/spark/blob/64d2c01f/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 6019282..730a4b5 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -88,6 +88,28 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
   }
 
+  test("test clearing of stageIdToActiveJobs") {
+    val conf = new SparkConf()
+    conf.set("spark.ui.retainedStages", 5.toString)
+    val listener = new JobProgressListener(conf)
+    val jobId = 0
+    val stageIds = 1 to 50
+    // Start a job with 50 stages
+    listener.onJobStart(createJobStartEvent(jobId, stageIds))
+    for (stageId <- stageIds) {
+      listener.onStageSubmitted(createStageStartEvent(stageId))
+    }
+    listener.stageIdToActiveJobIds.size should be > 0
+
+    // Complete the stages and job
+    for (stageId <- stageIds) {
+      listener.onStageCompleted(createStageEndEvent(stageId, failed = false))
+    }
+    listener.onJobEnd(createJobEndEvent(jobId, false))
+    assertActiveJobsStateIsEmpty(listener)
+    listener.stageIdToActiveJobIds.size should be (0)
+  }
+
   test("test LRU eviction of jobs") {
     val conf = new SparkConf()
     conf.set("spark.ui.retainedStages", 5.toString)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org