You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2018/02/21 23:37:32 UTC

spark git commit: [SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist

Repository: spark
Updated Branches:
  refs/heads/master 3fd0ccb13 -> 744d5af65


[SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist

## What changes were proposed in this pull request?

The issue here is `AppStatusStore.lastStageAttempt` will return the next available stage in the store when a stage doesn't exist.

This PR adds `last(stageId)` to ensure it returns a correct `StageData`

## How was this patch tested?

The new unit test.

Author: Shixiong Zhu <zs...@gmail.com>

Closes #20654 from zsxwing/SPARK-23481.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/744d5af6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/744d5af6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/744d5af6

Branch: refs/heads/master
Commit: 744d5af652ee8cece361cbca31e5201134e0fb42
Parents: 3fd0ccb
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Wed Feb 21 15:37:28 2018 -0800
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Wed Feb 21 15:37:28 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/status/AppStatusStore.scala    |  6 +++-
 .../spark/status/AppStatusListenerSuite.scala   | 33 ++++++++++++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/744d5af6/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index efc2853..688f25a 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -95,7 +95,11 @@ private[spark] class AppStatusStore(
   }
 
   def lastStageAttempt(stageId: Int): v1.StageData = {
-    val it = store.view(classOf[StageDataWrapper]).index("stageId").reverse().first(stageId)
+    val it = store.view(classOf[StageDataWrapper])
+      .index("stageId")
+      .reverse()
+      .first(stageId)
+      .last(stageId)
       .closeableIterator()
     try {
       if (it.hasNext()) {

http://git-wip-us.apache.org/repos/asf/spark/blob/744d5af6/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 7495027..673d191 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1121,6 +1121,39 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     }
   }
 
+  test("lastStageAttempt should fail when the stage doesn't exist") {
+    val testConf = conf.clone().set(MAX_RETAINED_STAGES, 1)
+    val listener = new AppStatusListener(store, testConf, true)
+    val appStore = new AppStatusStore(store)
+
+    val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+    val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+    val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
+
+    time += 1
+    stage1.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
+    stage1.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+    // Make stage 3 complete before stage 2 so that stage 3 will be evicted
+    time += 1
+    stage3.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties()))
+    stage3.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(stage3))
+
+    time += 1
+    stage2.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties()))
+    stage2.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(stage2))
+
+    assert(appStore.asOption(appStore.lastStageAttempt(1)) === None)
+    assert(appStore.asOption(appStore.lastStageAttempt(2)).map(_.stageId) === Some(2))
+    assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
+  }
+
   test("driver logs") {
     val listener = new AppStatusListener(store, conf, true)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org