You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2018/02/21 23:37:32 UTC
spark git commit: [SPARK-23481][WEBUI] lastStageAttempt should fail
when a stage doesn't exist
Repository: spark
Updated Branches:
refs/heads/master 3fd0ccb13 -> 744d5af65
[SPARK-23481][WEBUI] lastStageAttempt should fail when a stage doesn't exist
## What changes were proposed in this pull request?
The issue here is `AppStatusStore.lastStageAttempt` will return the next available stage in the store when a stage doesn't exist.
This PR adds `last(stageId)` to ensure it returns a correct `StageData`
## How was this patch tested?
The new unit test.
Author: Shixiong Zhu <zs...@gmail.com>
Closes #20654 from zsxwing/SPARK-23481.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/744d5af6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/744d5af6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/744d5af6
Branch: refs/heads/master
Commit: 744d5af652ee8cece361cbca31e5201134e0fb42
Parents: 3fd0ccb
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Wed Feb 21 15:37:28 2018 -0800
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Wed Feb 21 15:37:28 2018 -0800
----------------------------------------------------------------------
.../apache/spark/status/AppStatusStore.scala | 6 +++-
.../spark/status/AppStatusListenerSuite.scala | 33 ++++++++++++++++++++
2 files changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/744d5af6/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index efc2853..688f25a 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -95,7 +95,11 @@ private[spark] class AppStatusStore(
}
def lastStageAttempt(stageId: Int): v1.StageData = {
- val it = store.view(classOf[StageDataWrapper]).index("stageId").reverse().first(stageId)
+ val it = store.view(classOf[StageDataWrapper])
+ .index("stageId")
+ .reverse()
+ .first(stageId)
+ .last(stageId)
.closeableIterator()
try {
if (it.hasNext()) {
http://git-wip-us.apache.org/repos/asf/spark/blob/744d5af6/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 7495027..673d191 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1121,6 +1121,39 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}
+ test("lastStageAttempt should fail when the stage doesn't exist") {
+ val testConf = conf.clone().set(MAX_RETAINED_STAGES, 1)
+ val listener = new AppStatusListener(store, testConf, true)
+ val appStore = new AppStatusStore(store)
+
+ val stage1 = new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1")
+ val stage2 = new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2")
+ val stage3 = new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3")
+
+ time += 1
+ stage1.submissionTime = Some(time)
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage1, new Properties()))
+ stage1.completionTime = Some(time)
+ listener.onStageCompleted(SparkListenerStageCompleted(stage1))
+
+ // Make stage 3 complete before stage 2 so that stage 3 will be evicted
+ time += 1
+ stage3.submissionTime = Some(time)
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage3, new Properties()))
+ stage3.completionTime = Some(time)
+ listener.onStageCompleted(SparkListenerStageCompleted(stage3))
+
+ time += 1
+ stage2.submissionTime = Some(time)
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage2, new Properties()))
+ stage2.completionTime = Some(time)
+ listener.onStageCompleted(SparkListenerStageCompleted(stage2))
+
+ assert(appStore.asOption(appStore.lastStageAttempt(1)) === None)
+ assert(appStore.asOption(appStore.lastStageAttempt(2)).map(_.stageId) === Some(2))
+ assert(appStore.asOption(appStore.lastStageAttempt(3)) === None)
+ }
+
test("driver logs") {
val listener = new AppStatusListener(store, conf, true)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org