You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/05 10:41:58 UTC
spark git commit: [SPARK-23307][WEBUI] Sort jobs/stages/tasks/queries
with the completed timestamp before cleaning up them
Repository: spark
Updated Branches:
refs/heads/master 6fb3fd153 -> a6bf3db20
[SPARK-23307][WEBUI] Sort jobs/stages/tasks/queries with the completed timestamp before cleaning up them
## What changes were proposed in this pull request?
Sort jobs/stages/tasks/queries with the completed timestamp before cleaning up them to make the behavior consistent with 2.2.
## How was this patch tested?
- Jenkins.
- Manually ran the following codes and checked the UI for jobs/stages/tasks/queries.
```
spark.ui.retainedJobs 10
spark.ui.retainedStages 10
spark.sql.ui.retainedExecutions 10
spark.ui.retainedTasks 10
```
```
new Thread() {
override def run() {
spark.range(1, 2).foreach { i =>
Thread.sleep(10000)
}
}
}.start()
Thread.sleep(5000)
for (_ <- 1 to 20) {
new Thread() {
override def run() {
spark.range(1, 2).foreach { i =>
}
}
}.start()
}
Thread.sleep(15000)
spark.range(1, 2).foreach { i =>
}
sc.makeRDD(1 to 100, 100).foreach { i =>
}
```
Author: Shixiong Zhu <zs...@gmail.com>
Closes #20481 from zsxwing/SPARK-23307.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6bf3db2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6bf3db2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6bf3db2
Branch: refs/heads/master
Commit: a6bf3db20773ba65cbc4f2775db7bd215e78829a
Parents: 6fb3fd1
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Mon Feb 5 18:41:49 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Feb 5 18:41:49 2018 +0800
----------------------------------------------------------------------
.../apache/spark/status/AppStatusListener.scala | 13 +--
.../org/apache/spark/status/storeTypes.scala | 7 ++
.../spark/status/AppStatusListenerSuite.scala | 90 ++++++++++++++++++++
.../sql/execution/ui/SQLAppStatusListener.scala | 4 +-
.../sql/execution/ui/SQLAppStatusStore.scala | 9 +-
.../ui/SQLAppStatusListenerSuite.scala | 45 ++++++++++
6 files changed, 158 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a6bf3db2/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 3e34bdc..ab01cdd 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -875,8 +875,8 @@ private[spark] class AppStatusListener(
return
}
- val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
- countToDelete.toInt) { j =>
+ val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN
}
toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
@@ -888,8 +888,8 @@ private[spark] class AppStatusListener(
return
}
- val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
- countToDelete.toInt) { s =>
+ val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L)
+ val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s =>
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
}
@@ -945,8 +945,9 @@ private[spark] class AppStatusListener(
val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt
if (countToDelete > 0) {
val stageKey = Array(stage.info.stageId, stage.info.attemptNumber)
- val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
- .last(stageKey)
+ val view = kvstore.view(classOf[TaskDataWrapper])
+ .index(TaskIndexNames.COMPLETION_TIME)
+ .parent(stageKey)
// Try to delete finished tasks only.
val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
http://git-wip-us.apache.org/repos/asf/spark/blob/a6bf3db2/core/src/main/scala/org/apache/spark/status/storeTypes.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index c9cb996..412644d 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -73,6 +73,8 @@ private[spark] class JobDataWrapper(
@JsonIgnore @KVIndex
private def id: Int = info.jobId
+ @JsonIgnore @KVIndex("completionTime")
+ private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L)
}
private[spark] class StageDataWrapper(
@@ -90,6 +92,8 @@ private[spark] class StageDataWrapper(
@JsonIgnore @KVIndex("active")
private def active: Boolean = info.status == StageStatus.ACTIVE
+ @JsonIgnore @KVIndex("completionTime")
+ private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L)
}
/**
@@ -134,6 +138,7 @@ private[spark] object TaskIndexNames {
final val STAGE = "stage"
final val STATUS = "sta"
final val TASK_INDEX = "idx"
+ final val COMPLETION_TIME = "ct"
}
/**
@@ -337,6 +342,8 @@ private[spark] class TaskDataWrapper(
@JsonIgnore @KVIndex(value = TaskIndexNames.ERROR, parent = TaskIndexNames.STAGE)
private def error: String = if (errorMessage.isDefined) errorMessage.get else ""
+ @JsonIgnore @KVIndex(value = TaskIndexNames.COMPLETION_TIME, parent = TaskIndexNames.STAGE)
+ private def completionTime: Long = launchTime + duration
}
private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) {
http://git-wip-us.apache.org/repos/asf/spark/blob/a6bf3db2/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 042bba7..b74d6ee 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1010,6 +1010,96 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}
+ test("eviction should respect job completion time") {
+ val testConf = conf.clone().set(MAX_RETAINED_JOBS, 2)
+ val listener = new AppStatusListener(store, testConf, true)
+
+ // Start job 1 and job 2
+ time += 1
+ listener.onJobStart(SparkListenerJobStart(1, time, Nil, null))
+ time += 1
+ listener.onJobStart(SparkListenerJobStart(2, time, Nil, null))
+
+ // Stop job 2 before job 1
+ time += 1
+ listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded))
+ time += 1
+ listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
+
+ // Start job 3 and job 2 should be evicted.
+ time += 1
+ listener.onJobStart(SparkListenerJobStart(3, time, Nil, null))
+ assert(store.count(classOf[JobDataWrapper]) === 2)
+ intercept[NoSuchElementException] {
+ store.read(classOf[JobDataWrapper], 2)
+ }
+ }
+
+ test("eviction should respect stage completion time") {
+ val testConf = conf.clone().set(MAX_RETAINED_STAGES, 2)
+ val listener = new AppStatusListener(store, testConf, true)
+
+ val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+ val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+ val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
+
+ // Start stage 1 and stage 2
+ time += 1
+ stage1.submissionTime = Some(time)
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
+ time += 1
+ stage2.submissionTime = Some(time)
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties()))
+
+ // Stop stage 2 before stage 1
+ time += 1
+ stage2.completionTime = Some(time)
+ listener.onStageCompleted(SparkListenerStageCompleted(stage2))
+ time += 1
+ stage1.completionTime = Some(time)
+ listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+ // Start stage 3 and stage 2 should be evicted.
+ stage3.submissionTime = Some(time)
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties()))
+ assert(store.count(classOf[StageDataWrapper]) === 2)
+ intercept[NoSuchElementException] {
+ store.read(classOf[StageDataWrapper], Array(2, 0))
+ }
+ }
+
+ test("eviction should respect task completion time") {
+ val testConf = conf.clone().set(MAX_RETAINED_TASKS_PER_STAGE, 2)
+ val listener = new AppStatusListener(store, testConf, true)
+
+ val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+ stage1.submissionTime = Some(time)
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
+
+ // Start task 1 and task 2
+ val tasks = createTasks(3, Array("1"))
+ tasks.take(2).foreach { task =>
+ listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, task))
+ }
+
+ // Stop task 2 before task 1
+ time += 1
+ tasks(1).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(1), null))
+ time += 1
+ tasks(0).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(
+ SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
+
+ // Start task 3 and task 2 should be evicted.
+ listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, tasks(2)))
+ assert(store.count(classOf[TaskDataWrapper]) === 2)
+ intercept[NoSuchElementException] {
+ store.read(classOf[TaskDataWrapper], tasks(1).id)
+ }
+ }
+
test("driver logs") {
val listener = new AppStatusListener(store, conf, true)
http://git-wip-us.apache.org/repos/asf/spark/blob/a6bf3db2/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 73a1052..53fb9a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -332,8 +332,8 @@ class SQLAppStatusListener(
return
}
- val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[SQLExecutionUIData]),
- countToDelete.toInt) { e => e.completionTime.isDefined }
+ val view = kvstore.view(classOf[SQLExecutionUIData]).index("completionTime").first(0L)
+ val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined)
toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a6bf3db2/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
index 910f2e5..9a76584 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
@@ -23,11 +23,12 @@ import java.util.Date
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.spark.JobExecutionStatus
import org.apache.spark.status.KVUtils.KVIndexParam
-import org.apache.spark.util.kvstore.KVStore
+import org.apache.spark.util.kvstore.{KVIndex, KVStore}
/**
* Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's
@@ -90,7 +91,11 @@ class SQLExecutionUIData(
* from the SQL listener instance.
*/
@JsonDeserialize(keyAs = classOf[JLong])
- val metricValues: Map[Long, String])
+ val metricValues: Map[Long, String]) {
+
+ @JsonIgnore @KVIndex("completionTime")
+ private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)
+}
class SparkPlanGraphWrapper(
@KVIndexParam val executionId: Long,
http://git-wip-us.apache.org/repos/asf/spark/blob/a6bf3db2/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 7d84f45..85face3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.status.config._
@@ -510,6 +511,50 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
}
}
+ test("eviction should respect execution completion time") {
+ val conf = sparkContext.conf.clone().set(UI_RETAINED_EXECUTIONS.key, "2")
+ val store = new ElementTrackingStore(new InMemoryStore, conf)
+ val listener = new SQLAppStatusListener(conf, store, live = true)
+ val statusStore = new SQLAppStatusStore(store, Some(listener))
+
+ var time = 0
+ val df = createTestDataFrame
+ // Start execution 1 and execution 2
+ time += 1
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ 1,
+ "test",
+ "test",
+ df.queryExecution.toString,
+ SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+ time))
+ time += 1
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ 2,
+ "test",
+ "test",
+ df.queryExecution.toString,
+ SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+ time))
+
+ // Stop execution 2 before execution 1
+ time += 1
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(2, time))
+ time += 1
+ listener.onOtherEvent(SparkListenerSQLExecutionEnd(1, time))
+
+ // Start execution 3 and execution 2 should be evicted.
+ time += 1
+ listener.onOtherEvent(SparkListenerSQLExecutionStart(
+ 3,
+ "test",
+ "test",
+ df.queryExecution.toString,
+ SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+ time))
+ assert(statusStore.executionsCount === 2)
+ assert(statusStore.execution(2) === None)
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org