You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/05/14 01:27:56 UTC

spark git commit: [SPARK-7608] Clean up old state in RDDOperationGraphListener

Repository: spark
Updated Branches:
  refs/heads/master e683182c3 -> f6e18388d


[SPARK-7608] Clean up old state in RDDOperationGraphListener

This is necessary for streaming and long-running Spark applications. zsxwing tdas

Author: Andrew Or <an...@databricks.com>

Closes #6125 from andrewor14/viz-listener-leak and squashes the following commits:

8660949 [Andrew Or] Fix thing + add tests
33c0843 [Andrew Or] Clean up old job state


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6e18388
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6e18388
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6e18388

Branch: refs/heads/master
Commit: f6e18388d993d99f768c6d547327e0720ec64224
Parents: e683182
Author: Andrew Or <an...@databricks.com>
Authored: Wed May 13 16:27:48 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Wed May 13 16:27:48 2015 -0700

----------------------------------------------------------------------
 .../ui/scope/RDDOperationGraphListener.scala    | 30 +++++--
 .../scope/RDDOperationGraphListenerSuite.scala  | 87 ++++++++++++++++++++
 2 files changed, 108 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f6e18388/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
index 2884a49..f0f7007 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala
@@ -27,11 +27,16 @@ import org.apache.spark.ui.SparkUI
  * A SparkListener that constructs a DAG of RDD operations.
  */
 private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener {
-  private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
-  private val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
-  private val stageIds = new mutable.ArrayBuffer[Int]
+  private[ui] val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
+  private[ui] val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
+
+  // Keep track of the order in which these are inserted so we can remove old ones
+  private[ui] val jobIds = new mutable.ArrayBuffer[Int]
+  private[ui] val stageIds = new mutable.ArrayBuffer[Int]
 
   // How many jobs or stages to retain graph metadata for
+  private val retainedJobs =
+    conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
   private val retainedStages =
     conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
 
@@ -50,15 +55,22 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
   /** On job start, construct a RDDOperationGraph for each stage in the job for display later. */
   override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
     val jobId = jobStart.jobId
-    val stageInfos = jobStart.stageInfos
+    jobIds += jobId
+    jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted
 
-    stageInfos.foreach { stageInfo =>
-      stageIds += stageInfo.stageId
-      stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
+    // Remove state for old jobs
+    if (jobIds.size >= retainedJobs) {
+      val toRemove = math.max(retainedJobs / 10, 1)
+      jobIds.take(toRemove).foreach { id => jobIdToStageIds.remove(id) }
+      jobIds.trimStart(toRemove)
     }
-    jobIdToStageIds(jobId) = stageInfos.map(_.stageId).sorted
+  }
 
-    // Remove graph metadata for old stages
+  /** Remove graph metadata for old stages */
+  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
+    val stageInfo = stageSubmitted.stageInfo
+    stageIds += stageInfo.stageId
+    stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
     if (stageIds.size >= retainedStages) {
       val toRemove = math.max(retainedStages / 10, 1)
       stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }

http://git-wip-us.apache.org/repos/asf/spark/blob/f6e18388/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala
new file mode 100644
index 0000000..619b38a
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphListenerSuite.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.scope
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListenerStageSubmitted, StageInfo}
+
+class RDDOperationGraphListenerSuite extends FunSuite {
+  private var jobIdCounter = 0
+  private var stageIdCounter = 0
+
+  /** Run a job with the specified number of stages. */
+  private def runOneJob(numStages: Int, listener: RDDOperationGraphListener): Unit = {
+    assert(numStages > 0, "I will not run a job with 0 stages for you.")
+    val stageInfos = (0 until numStages).map { _ =>
+      val stageInfo = new StageInfo(stageIdCounter, 0, "s", 0, Seq.empty, Seq.empty, "d")
+      listener.onStageSubmitted(new SparkListenerStageSubmitted(stageInfo))
+      stageIdCounter += 1
+      stageInfo
+    }
+    listener.onJobStart(new SparkListenerJobStart(jobIdCounter, 0, stageInfos))
+    jobIdCounter += 1
+  }
+
+  test("listener cleans up metadata") {
+
+    val conf = new SparkConf()
+      .set("spark.ui.retainedStages", "10")
+      .set("spark.ui.retainedJobs", "10")
+
+    val listener = new RDDOperationGraphListener(conf)
+    assert(listener.jobIdToStageIds.isEmpty)
+    assert(listener.stageIdToGraph.isEmpty)
+    assert(listener.jobIds.isEmpty)
+    assert(listener.stageIds.isEmpty)
+
+    // Run a few jobs, but not enough for clean up yet
+    runOneJob(1, listener)
+    runOneJob(2, listener)
+    runOneJob(3, listener)
+    assert(listener.jobIdToStageIds.size === 3)
+    assert(listener.stageIdToGraph.size === 6)
+    assert(listener.jobIds.size === 3)
+    assert(listener.stageIds.size === 6)
+
+    // Run a few more, but this time the stages should be cleaned up, but not the jobs
+    runOneJob(5, listener)
+    runOneJob(100, listener)
+    assert(listener.jobIdToStageIds.size === 5)
+    assert(listener.stageIdToGraph.size === 9)
+    assert(listener.jobIds.size === 5)
+    assert(listener.stageIds.size === 9)
+
+    // Run a few more, but this time both jobs and stages should be cleaned up
+    (1 to 100).foreach { _ =>
+      runOneJob(1, listener)
+    }
+    assert(listener.jobIdToStageIds.size === 9)
+    assert(listener.stageIdToGraph.size === 9)
+    assert(listener.jobIds.size === 9)
+    assert(listener.stageIds.size === 9)
+
+    // Ensure we clean up old jobs and stages, not arbitrary ones
+    assert(!listener.jobIdToStageIds.contains(0))
+    assert(!listener.stageIdToGraph.contains(0))
+    assert(!listener.stageIds.contains(0))
+    assert(!listener.jobIds.contains(0))
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org