You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/02/24 20:02:51 UTC
spark git commit: [Spark-5967] [UI] Correctly clean
JobProgressListener.stageIdToActiveJobIds
Repository: spark
Updated Branches:
refs/heads/master 201236628 -> 64d2c01ff
[Spark-5967] [UI] Correctly clean JobProgressListener.stageIdToActiveJobIds
Patch should be self-explanatory
pwendell JoshRosen
Author: Tathagata Das <ta...@gmail.com>
Closes #4741 from tdas/SPARK-5967 and squashes the following commits:
653b5bb [Tathagata Das] Fixed the fix and added test
e2de972 [Tathagata Das] Clear stages which have no corresponding active jobs.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64d2c01f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64d2c01f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64d2c01f
Branch: refs/heads/master
Commit: 64d2c01ff1048de83b9b8efce987b55e457298f9
Parents: 2012366
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Feb 24 11:02:47 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Feb 24 11:02:47 2015 -0800
----------------------------------------------------------------------
.../spark/ui/jobs/JobProgressListener.scala | 3 +++
.../ui/jobs/JobProgressListenerSuite.scala | 22 ++++++++++++++++++++
2 files changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/64d2c01f/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 0b6fe70..937d95a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -203,6 +203,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
for (stageId <- jobData.stageIds) {
stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
jobsUsingStage.remove(jobEnd.jobId)
+ if (jobsUsingStage.isEmpty) {
+ stageIdToActiveJobIds.remove(stageId)
+ }
stageIdToInfo.get(stageId).foreach { stageInfo =>
if (stageInfo.submissionTime.isEmpty) {
// if this stage is pending, it won't complete, so mark it as "skipped":
http://git-wip-us.apache.org/repos/asf/spark/blob/64d2c01f/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 6019282..730a4b5 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -88,6 +88,28 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
}
+ test("test clearing of stageIdToActiveJobs") {
+ val conf = new SparkConf()
+ conf.set("spark.ui.retainedStages", 5.toString)
+ val listener = new JobProgressListener(conf)
+ val jobId = 0
+ val stageIds = 1 to 50
+ // Start a job with 50 stages
+ listener.onJobStart(createJobStartEvent(jobId, stageIds))
+ for (stageId <- stageIds) {
+ listener.onStageSubmitted(createStageStartEvent(stageId))
+ }
+ listener.stageIdToActiveJobIds.size should be > 0
+
+ // Complete the stages and job
+ for (stageId <- stageIds) {
+ listener.onStageCompleted(createStageEndEvent(stageId, failed = false))
+ }
+ listener.onJobEnd(createJobEndEvent(jobId, false))
+ assertActiveJobsStateIsEmpty(listener)
+ listener.stageIdToActiveJobIds.size should be (0)
+ }
+
test("test LRU eviction of jobs") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org