You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/05 10:41:58 UTC

spark git commit: [SPARK-23307][WEBUI] Sort jobs/stages/tasks/queries with the completed timestamp before cleaning up them

Repository: spark
Updated Branches:
  refs/heads/master 6fb3fd153 -> a6bf3db20


[SPARK-23307][WEBUI] Sort jobs/stages/tasks/queries with the completed timestamp before cleaning up them

## What changes were proposed in this pull request?

Sort jobs/stages/tasks/queries with the completed timestamp before cleaning up them to make the behavior consistent with 2.2.

## How was this patch tested?

- Jenkins.
- Manually ran the following codes and checked the UI for jobs/stages/tasks/queries.

```
spark.ui.retainedJobs 10
spark.ui.retainedStages 10
spark.sql.ui.retainedExecutions 10
spark.ui.retainedTasks 10
```

```
new Thread() {
  override def run() {
    spark.range(1, 2).foreach { i =>
        Thread.sleep(10000)
    }
  }
}.start()

Thread.sleep(5000)

for (_ <- 1 to 20) {
    new Thread() {
      override def run() {
        spark.range(1, 2).foreach { i =>
        }
      }
    }.start()
}

Thread.sleep(15000)
  spark.range(1, 2).foreach { i =>
}

sc.makeRDD(1 to 100, 100).foreach { i =>
}
```

Author: Shixiong Zhu <zs...@gmail.com>

Closes #20481 from zsxwing/SPARK-23307.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6bf3db2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6bf3db2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6bf3db2

Branch: refs/heads/master
Commit: a6bf3db20773ba65cbc4f2775db7bd215e78829a
Parents: 6fb3fd1
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Mon Feb 5 18:41:49 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Mon Feb 5 18:41:49 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/status/AppStatusListener.scala | 13 +--
 .../org/apache/spark/status/storeTypes.scala    |  7 ++
 .../spark/status/AppStatusListenerSuite.scala   | 90 ++++++++++++++++++++
 .../sql/execution/ui/SQLAppStatusListener.scala |  4 +-
 .../sql/execution/ui/SQLAppStatusStore.scala    |  9 +-
 .../ui/SQLAppStatusListenerSuite.scala          | 45 ++++++++++
 6 files changed, 158 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6bf3db2/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 3e34bdc..ab01cdd 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -875,8 +875,8 @@ private[spark] class AppStatusListener(
       return
     }
 
-    val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
-        countToDelete.toInt) { j =>
+    val view = kvstore.view(classOf[JobDataWrapper]).index("completionTime").first(0L)
+    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt) { j =>
       j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN
     }
     toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
@@ -888,8 +888,8 @@ private[spark] class AppStatusListener(
       return
     }
 
-    val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
-        countToDelete.toInt) { s =>
+    val view = kvstore.view(classOf[StageDataWrapper]).index("completionTime").first(0L)
+    val stages = KVUtils.viewToSeq(view, countToDelete.toInt) { s =>
       s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
     }
 
@@ -945,8 +945,9 @@ private[spark] class AppStatusListener(
     val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt
     if (countToDelete > 0) {
       val stageKey = Array(stage.info.stageId, stage.info.attemptNumber)
-      val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
-        .last(stageKey)
+      val view = kvstore.view(classOf[TaskDataWrapper])
+        .index(TaskIndexNames.COMPLETION_TIME)
+        .parent(stageKey)
 
       // Try to delete finished tasks only.
       val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>

http://git-wip-us.apache.org/repos/asf/spark/blob/a6bf3db2/core/src/main/scala/org/apache/spark/status/storeTypes.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index c9cb996..412644d 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -73,6 +73,8 @@ private[spark] class JobDataWrapper(
   @JsonIgnore @KVIndex
   private def id: Int = info.jobId
 
+  @JsonIgnore @KVIndex("completionTime")
+  private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L)
 }
 
 private[spark] class StageDataWrapper(
@@ -90,6 +92,8 @@ private[spark] class StageDataWrapper(
   @JsonIgnore @KVIndex("active")
   private def active: Boolean = info.status == StageStatus.ACTIVE
 
+  @JsonIgnore @KVIndex("completionTime")
+  private def completionTime: Long = info.completionTime.map(_.getTime).getOrElse(-1L)
 }
 
 /**
@@ -134,6 +138,7 @@ private[spark] object TaskIndexNames {
   final val STAGE = "stage"
   final val STATUS = "sta"
   final val TASK_INDEX = "idx"
+  final val COMPLETION_TIME = "ct"
 }
 
 /**
@@ -337,6 +342,8 @@ private[spark] class TaskDataWrapper(
   @JsonIgnore @KVIndex(value = TaskIndexNames.ERROR, parent = TaskIndexNames.STAGE)
   private def error: String = if (errorMessage.isDefined) errorMessage.get else ""
 
+  @JsonIgnore @KVIndex(value = TaskIndexNames.COMPLETION_TIME, parent = TaskIndexNames.STAGE)
+  private def completionTime: Long = launchTime + duration
 }
 
 private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) {

http://git-wip-us.apache.org/repos/asf/spark/blob/a6bf3db2/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 042bba7..b74d6ee 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1010,6 +1010,96 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     }
   }
 
+  test("eviction should respect job completion time") {
+    val testConf = conf.clone().set(MAX_RETAINED_JOBS, 2)
+    val listener = new AppStatusListener(store, testConf, true)
+
+    // Start job 1 and job 2
+    time += 1
+    listener.onJobStart(SparkListenerJobStart(1, time, Nil, null))
+    time += 1
+    listener.onJobStart(SparkListenerJobStart(2, time, Nil, null))
+
+    // Stop job 2 before job 1
+    time += 1
+    listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded))
+    time += 1
+    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
+
+    // Start job 3 and job 2 should be evicted.
+    time += 1
+    listener.onJobStart(SparkListenerJobStart(3, time, Nil, null))
+    assert(store.count(classOf[JobDataWrapper]) === 2)
+    intercept[NoSuchElementException] {
+      store.read(classOf[JobDataWrapper], 2)
+    }
+  }
+
+  test("eviction should respect stage completion time") {
+    val testConf = conf.clone().set(MAX_RETAINED_STAGES, 2)
+    val listener = new AppStatusListener(store, testConf, true)
+
+    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+    val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
+
+    // Start stage 1 and stage 2
+    time += 1
+    stage1.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
+    time += 1
+    stage2.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties()))
+
+    // Stop stage 2 before stage 1
+    time += 1
+    stage2.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(stage2))
+    time += 1
+    stage1.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+    // Start stage 3 and stage 2 should be evicted.
+    stage3.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties()))
+    assert(store.count(classOf[StageDataWrapper]) === 2)
+    intercept[NoSuchElementException] {
+      store.read(classOf[StageDataWrapper], Array(2, 0))
+    }
+  }
+
+  test("eviction should respect task completion time") {
+    val testConf = conf.clone().set(MAX_RETAINED_TASKS_PER_STAGE, 2)
+    val listener = new AppStatusListener(store, testConf, true)
+
+    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+    stage1.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
+
+    // Start task 1 and task 2
+    val tasks = createTasks(3, Array("1"))
+    tasks.take(2).foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, task))
+    }
+
+    // Stop task 2 before task 1
+    time += 1
+    tasks(1).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(1), null))
+    time += 1
+    tasks(0).markFinished(TaskState.FINISHED, time)
+    listener.onTaskEnd(
+      SparkListenerTaskEnd(stage1.stageId, stage1.attemptId, "taskType", Success, tasks(0), null))
+
+    // Start task 3 and task 2 should be evicted.
+    listener.onTaskStart(SparkListenerTaskStart(stage1.stageId, stage1.attemptNumber, tasks(2)))
+    assert(store.count(classOf[TaskDataWrapper]) === 2)
+    intercept[NoSuchElementException] {
+      store.read(classOf[TaskDataWrapper], tasks(1).id)
+    }
+  }
+
   test("driver logs") {
     val listener = new AppStatusListener(store, conf, true)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6bf3db2/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 73a1052..53fb9a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -332,8 +332,8 @@ class SQLAppStatusListener(
       return
     }
 
-    val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[SQLExecutionUIData]),
-        countToDelete.toInt) { e => e.completionTime.isDefined }
+    val view = kvstore.view(classOf[SQLExecutionUIData]).index("completionTime").first(0L)
+    val toDelete = KVUtils.viewToSeq(view, countToDelete.toInt)(_.completionTime.isDefined)
     toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6bf3db2/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
index 910f2e5..9a76584 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
@@ -23,11 +23,12 @@ import java.util.Date
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
+import com.fasterxml.jackson.annotation.JsonIgnore
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize
 
 import org.apache.spark.JobExecutionStatus
 import org.apache.spark.status.KVUtils.KVIndexParam
-import org.apache.spark.util.kvstore.KVStore
+import org.apache.spark.util.kvstore.{KVIndex, KVStore}
 
 /**
  * Provides a view of a KVStore with methods that make it easy to query SQL-specific state. There's
@@ -90,7 +91,11 @@ class SQLExecutionUIData(
      * from the SQL listener instance.
      */
     @JsonDeserialize(keyAs = classOf[JLong])
-    val metricValues: Map[Long, String])
+    val metricValues: Map[Long, String]) {
+
+  @JsonIgnore @KVIndex("completionTime")
+  private def completionTimeIndex: Long = completionTime.map(_.getTime).getOrElse(-1L)
+}
 
 class SparkPlanGraphWrapper(
     @KVIndexParam val executionId: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/a6bf3db2/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 7d84f45..85face3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.util.quietly
 import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.status.ElementTrackingStore
 import org.apache.spark.status.config._
@@ -510,6 +511,50 @@ class SQLAppStatusListenerSuite extends SparkFunSuite with SharedSQLContext with
     }
   }
 
+  test("eviction should respect execution completion time") {
+    val conf = sparkContext.conf.clone().set(UI_RETAINED_EXECUTIONS.key, "2")
+    val store = new ElementTrackingStore(new InMemoryStore, conf)
+    val listener = new SQLAppStatusListener(conf, store, live = true)
+    val statusStore = new SQLAppStatusStore(store, Some(listener))
+
+    var time = 0
+    val df = createTestDataFrame
+    // Start execution 1 and execution 2
+    time += 1
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      1,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      time))
+    time += 1
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      2,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      time))
+
+    // Stop execution 2 before execution 1
+    time += 1
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(2, time))
+    time += 1
+    listener.onOtherEvent(SparkListenerSQLExecutionEnd(1, time))
+
+    // Start execution 3 and execution 2 should be evicted.
+    time += 1
+    listener.onOtherEvent(SparkListenerSQLExecutionStart(
+      3,
+      "test",
+      "test",
+      df.queryExecution.toString,
+      SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
+      time))
+    assert(statusStore.executionsCount === 2)
+    assert(statusStore.execution(2) === None)
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org