You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/01/23 11:02:25 UTC
spark git commit: [SPARK-19146][CORE] Drop more elements when
stageData.taskData.size > retainedTasks
Repository: spark
Updated Branches:
refs/heads/master c4a6519c4 -> c99492141
[SPARK-19146][CORE] Drop more elements when stageData.taskData.size > retainedTasks
## What changes were proposed in this pull request?
Drop more elements when `stageData.taskData.size > retainedTasks` to reduce the number of times on call drop function.
## How was this patch tested?
Jenkins
Author: Yuming Wang <wg...@gmail.com>
Closes #16527 from wangyum/SPARK-19146.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c9949214
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c9949214
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c9949214
Branch: refs/heads/master
Commit: c99492141b1ddddb8edb6841a6e83748e5ba9bba
Parents: c4a6519
Author: Yuming Wang <wg...@gmail.com>
Authored: Mon Jan 23 11:02:22 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Jan 23 11:02:22 2017 +0000
----------------------------------------------------------------------
.../spark/ui/jobs/JobProgressListener.scala | 14 +++++++--
.../ui/jobs/JobProgressListenerSuite.scala | 30 ++++++++++++++++++++
docs/configuration.md | 12 ++++----
3 files changed, 47 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c9949214/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 83dc5d8..e87caff 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -142,7 +142,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
/** If stages is too large, remove and garbage collect old stages */
private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
- val toRemove = (stages.size - retainedStages)
+ val toRemove = calculateNumberToRemove(stages.size, retainedStages)
stages.take(toRemove).foreach { s =>
stageIdToData.remove((s.stageId, s.attemptId))
stageIdToInfo.remove(s.stageId)
@@ -154,7 +154,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
/** If jobs is too large, remove and garbage collect old jobs */
private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
if (jobs.size > retainedJobs) {
- val toRemove = (jobs.size - retainedJobs)
+ val toRemove = calculateNumberToRemove(jobs.size, retainedJobs)
jobs.take(toRemove).foreach { job =>
// Remove the job's UI data, if it exists
jobIdToData.remove(job.jobId).foreach { removedJob =>
@@ -409,7 +409,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// If Tasks is too large, remove and garbage collect old tasks
if (stageData.taskData.size > retainedTasks) {
- stageData.taskData = stageData.taskData.drop(stageData.taskData.size - retainedTasks)
+ stageData.taskData = stageData.taskData.drop(
+ calculateNumberToRemove(stageData.taskData.size, retainedTasks))
}
for (
@@ -431,6 +432,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
}
/**
+ * Remove at least (maxRetained / 10) items to reduce friction.
+ */
+ private def calculateNumberToRemove(dataSize: Int, retainedSize: Int): Int = {
+ math.max(retainedSize / 10, dataSize - retainedSize)
+ }
+
+ /**
* Upon receiving new metrics for a task, updates the per-stage and per-executor-per-stage
* aggregate metrics by calculating deltas between the currently recorded metrics and the new
* metrics.
http://git-wip-us.apache.org/repos/asf/spark/blob/c9949214/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index da853f1..e3127da 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -408,4 +408,34 @@ class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with
val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
assert(newTaskInfo.accumulables === Seq(userAccum))
}
+
+ test("SPARK-19146 drop more elements when stageData.taskData.size > retainedTasks") {
+ val conf = new SparkConf()
+ conf.set("spark.ui.retainedTasks", "100")
+ val taskMetrics = TaskMetrics.empty
+ taskMetrics.mergeShuffleReadMetrics()
+ val task = new ShuffleMapTask(0)
+ val taskType = Utils.getFormattedClassName(task)
+
+ val listener1 = new JobProgressListener(conf)
+ for (t <- 1 to 101) {
+ val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
+ taskInfo.finishTime = 1
+ listener1.onTaskEnd(
+ SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
+ }
+ // 101 - math.max(100 / 10, 101 - 100) = 91
+ assert(listener1.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 91)
+
+ val listener2 = new JobProgressListener(conf)
+ for (t <- 1 to 150) {
+ val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
+ taskInfo.finishTime = 1
+ listener2.onTaskEnd(
+ SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
+ }
+ // 150 - math.max(100 / 10, 150 - 100) = 100
+ assert(listener2.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 100)
+ }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c9949214/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 7a11a98..a6b1f15 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -684,24 +684,24 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.ui.retainedJobs</code></td>
<td>1000</td>
<td>
- How many jobs the Spark UI and status APIs remember before garbage
- collecting.
+ How many jobs the Spark UI and status APIs remember before garbage collecting.
+ This is a target maximum, and fewer elements may be retained in some circumstances.
</td>
</tr>
<tr>
<td><code>spark.ui.retainedStages</code></td>
<td>1000</td>
<td>
- How many stages the Spark UI and status APIs remember before garbage
- collecting.
+ How many stages the Spark UI and status APIs remember before garbage collecting.
+ This is a target maximum, and fewer elements may be retained in some circumstances.
</td>
</tr>
<tr>
<td><code>spark.ui.retainedTasks</code></td>
<td>100000</td>
<td>
- How many tasks the Spark UI and status APIs remember before garbage
- collecting.
+ How many tasks the Spark UI and status APIs remember before garbage collecting.
+ This is a target maximum, and fewer elements may be retained in some circumstances.
</td>
</tr>
<tr>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org