You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/14 06:04:16 UTC
spark git commit: [HOT FIX #6125] Do not wait for all stages to start
rendering
Repository: spark
Updated Branches:
refs/heads/master 728af88cf -> 3113da9c7
[HOT FIX #6125] Do not wait for all stages to start rendering
zsxwing
Author: Andrew Or <an...@databricks.com>
Closes #6138 from andrewor14/dag-viz-clean-properly and squashes the following commits:
19d4e98 [Andrew Or] Add synchronize
02542d6 [Andrew Or] Rename overloaded variable
d11bee1 [Andrew Or] Don't wait until all stages have started before rendering
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3113da9c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3113da9c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3113da9c
Branch: refs/heads/master
Commit: 3113da9c7067bbf90639866ae9d946f02cc484ff
Parents: 728af88
Author: Andrew Or <an...@databricks.com>
Authored: Wed May 13 21:04:13 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Wed May 13 21:04:13 2015 -0700
----------------------------------------------------------------------
.../ui/scope/RDDOperationGraphListener.scala | 34 +++++++++++---------
.../scope/RDDOperationGraphListenerSuite.scala | 1 -
2 files changed, 18 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3113da9c/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
index 3b77a1e..aa9c25c 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
@@ -41,11 +41,11 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
/** Return the graph metadata for the given stage, or None if no such information exists. */
- def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = {
- val stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
- val graphs = stageIds.flatMap { sid => stageIdToGraph.get(sid) }
+ def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = synchronized {
+ val _stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
+ val graphs = _stageIds.flatMap { sid => stageIdToGraph.get(sid) }
// If the metadata for some stages have been removed, do not bother rendering this job
- if (stageIds.size != graphs.size) {
+ if (_stageIds.size != graphs.size) {
Seq.empty
} else {
graphs
@@ -53,16 +53,29 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
}
/** Return the graph metadata for the given stage, or None if no such information exists. */
- def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = {
+ def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = synchronized {
stageIdToGraph.get(stageId)
}
/** On job start, construct a RDDOperationGraph for each stage in the job for display later. */
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
val jobId = jobStart.jobId
+ val stageInfos = jobStart.stageInfos
+
jobIds += jobId
jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted
+ stageInfos.foreach { stageInfo =>
+ stageIds += stageInfo.stageId
+ stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
+ // Remove state for old stages
+ if (stageIds.size >= retainedStages) {
+ val toRemove = math.max(retainedStages / 10, 1)
+ stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
+ stageIds.trimStart(toRemove)
+ }
+ }
+
// Remove state for old jobs
if (jobIds.size >= retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
@@ -71,15 +84,4 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
}
}
- /** Remove graph metadata for old stages */
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
- val stageInfo = stageSubmitted.stageInfo
- stageIds += stageInfo.stageId
- stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
- if (stageIds.size >= retainedStages) {
- val toRemove = math.max(retainedStages / 10, 1)
- stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
- stageIds.trimStart(toRemove)
- }
- }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/3113da9c/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala
index 619b38a..c659fc1 100644
--- a/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala
@@ -31,7 +31,6 @@ class RDDOperationGraphListenerSuite extends FunSuite {
assert(numStages > 0, "I will not run a job with 0 stages for you.")
val stageInfos = (0 until numStages).map { _ =>
val stageInfo = new StageInfo(stageIdCounter, 0, "s", 0, Seq.empty, Seq.empty, "d")
- listener.onStageSubmitted(new SparkListenerStageSubmitted(stageInfo))
stageIdCounter += 1
stageInfo
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org