You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/01/23 11:02:25 UTC

spark git commit: [SPARK-19146][CORE] Drop more elements when stageData.taskData.size > retainedTasks

Repository: spark
Updated Branches:
  refs/heads/master c4a6519c4 -> c99492141


[SPARK-19146][CORE] Drop more elements when stageData.taskData.size > retainedTasks

## What changes were proposed in this pull request?

Drop more elements when `stageData.taskData.size > retainedTasks` to reduce the number of times on call drop function.

## How was this patch tested?

Jenkins

Author: Yuming Wang <wg...@gmail.com>

Closes #16527 from wangyum/SPARK-19146.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c9949214
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c9949214
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c9949214

Branch: refs/heads/master
Commit: c99492141b1ddddb8edb6841a6e83748e5ba9bba
Parents: c4a6519
Author: Yuming Wang <wg...@gmail.com>
Authored: Mon Jan 23 11:02:22 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Jan 23 11:02:22 2017 +0000

----------------------------------------------------------------------
 .../spark/ui/jobs/JobProgressListener.scala     | 14 +++++++--
 .../ui/jobs/JobProgressListenerSuite.scala      | 30 ++++++++++++++++++++
 docs/configuration.md                           | 12 ++++----
 3 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c9949214/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 83dc5d8..e87caff 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -142,7 +142,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
   /** If stages is too large, remove and garbage collect old stages */
   private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
     if (stages.size > retainedStages) {
-      val toRemove = (stages.size - retainedStages)
+      val toRemove = calculateNumberToRemove(stages.size, retainedStages)
       stages.take(toRemove).foreach { s =>
         stageIdToData.remove((s.stageId, s.attemptId))
         stageIdToInfo.remove(s.stageId)
@@ -154,7 +154,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
   /** If jobs is too large, remove and garbage collect old jobs */
   private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
     if (jobs.size > retainedJobs) {
-      val toRemove = (jobs.size - retainedJobs)
+      val toRemove = calculateNumberToRemove(jobs.size, retainedJobs)
       jobs.take(toRemove).foreach { job =>
         // Remove the job's UI data, if it exists
         jobIdToData.remove(job.jobId).foreach { removedJob =>
@@ -409,7 +409,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
 
       // If Tasks is too large, remove and garbage collect old tasks
       if (stageData.taskData.size > retainedTasks) {
-        stageData.taskData = stageData.taskData.drop(stageData.taskData.size - retainedTasks)
+        stageData.taskData = stageData.taskData.drop(
+          calculateNumberToRemove(stageData.taskData.size, retainedTasks))
       }
 
       for (
@@ -431,6 +432,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
   }
 
   /**
+   * Remove at least (maxRetained / 10) items to reduce friction.
+   */
+  private def calculateNumberToRemove(dataSize: Int, retainedSize: Int): Int = {
+    math.max(retainedSize / 10, dataSize - retainedSize)
+  }
+
+  /**
    * Upon receiving new metrics for a task, updates the per-stage and per-executor-per-stage
    * aggregate metrics by calculating deltas between the currently recorded metrics and the new
    * metrics.

http://git-wip-us.apache.org/repos/asf/spark/blob/c9949214/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index da853f1..e3127da 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -408,4 +408,34 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
     val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
     assert(newTaskInfo.accumulables === Seq(userAccum))
   }
+
+  test("SPARK-19146 drop more elements when stageData.taskData.size > retainedTasks") {
+    val conf = new SparkConf()
+    conf.set("spark.ui.retainedTasks", "100")
+    val taskMetrics = TaskMetrics.empty
+    taskMetrics.mergeShuffleReadMetrics()
+    val task = new ShuffleMapTask(0)
+    val taskType = Utils.getFormattedClassName(task)
+
+    val listener1 = new JobProgressListener(conf)
+    for (t <- 1 to 101) {
+      val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
+      taskInfo.finishTime = 1
+      listener1.onTaskEnd(
+        SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
+    }
+    // 101 - math.max(100 / 10, 101 - 100) = 91
+    assert(listener1.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 91)
+
+    val listener2 = new JobProgressListener(conf)
+    for (t <- 1 to 150) {
+      val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
+      taskInfo.finishTime = 1
+      listener2.onTaskEnd(
+        SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
+    }
+    // 150 - math.max(100 / 10, 150 - 100) = 100
+    assert(listener2.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 100)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c9949214/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 7a11a98..a6b1f15 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -684,24 +684,24 @@ Apart from these, the following properties are also available, and may be useful
   <td><code>spark.ui.retainedJobs</code></td>
   <td>1000</td>
   <td>
-    How many jobs the Spark UI and status APIs remember before garbage
-    collecting.
+    How many jobs the Spark UI and status APIs remember before garbage collecting. 
+    This is a target maximum, and fewer elements may be retained in some circumstances.
   </td>
 </tr>
 <tr>
   <td><code>spark.ui.retainedStages</code></td>
   <td>1000</td>
   <td>
-    How many stages the Spark UI and status APIs remember before garbage
-    collecting.
+    How many stages the Spark UI and status APIs remember before garbage collecting. 
+    This is a target maximum, and fewer elements may be retained in some circumstances.
   </td>
 </tr>
 <tr>
   <td><code>spark.ui.retainedTasks</code></td>
   <td>100000</td>
   <td>
-    How many tasks the Spark UI and status APIs remember before garbage
-    collecting.
+    How many tasks the Spark UI and status APIs remember before garbage collecting. 
+    This is a target maximum, and fewer elements may be retained in some circumstances.
   </td>
 </tr>
 <tr>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org