You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/12/03 23:15:18 UTC
spark git commit: [SPARK-26219][CORE][BRANCH-2.4] Executor summary
should get updated for failure jobs in the history server UI
Repository: spark
Updated Branches:
refs/heads/branch-2.4 349e25bd4 -> 90fcd12af
[SPARK-26219][CORE][BRANCH-2.4] Executor summary should get updated for failure jobs in the history server UI
Back port the commit https://github.com/apache/spark/pull/23181 into Spark2.4 branch
Added UT
Closes #23191 from shahidki31/branch-2.4.
Authored-by: Shahid <sh...@gmail.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90fcd12a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90fcd12a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90fcd12a
Branch: refs/heads/branch-2.4
Commit: 90fcd12af936792a99738789ba1eeb9a1e7e3ce1
Parents: 349e25b
Author: Shahid <sh...@gmail.com>
Authored: Mon Dec 3 15:11:43 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Mon Dec 3 15:12:03 2018 -0800
----------------------------------------------------------------------
.../apache/spark/status/AppStatusListener.scala | 19 ++--
.../spark/status/AppStatusListenerSuite.scala | 94 ++++++++++++--------
2 files changed, 66 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/90fcd12a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index e6f0d08..5b564ef 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -599,9 +599,14 @@ private[spark] class AppStatusListener(
}
}
- // Force an update on live applications when the number of active tasks reaches 0. This is
- // checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
- conditionalLiveUpdate(exec, now, exec.activeTasks == 0)
+ // Force an update on both live and history applications when the number of active tasks
+ // reaches 0. This is checked in some tests (e.g. SQLTestUtilsBase) so it needs to be
+ // reliably up to date.
+ if (exec.activeTasks == 0) {
+ update(exec, now)
+ } else {
+ maybeUpdate(exec, now)
+ }
}
}
@@ -954,14 +959,6 @@ private[spark] class AppStatusListener(
}
}
- private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = {
- if (condition) {
- liveUpdate(entity, now)
- } else {
- maybeUpdate(entity, now)
- }
- }
-
private def cleanupExecutors(count: Long): Unit = {
// Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted.
http://git-wip-us.apache.org/repos/asf/spark/blob/90fcd12a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index b6ddbe0..f34be48 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1274,48 +1274,70 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(allJobs.head.numFailedStages == 1)
}
- test("SPARK-25451: total tasks in the executor summary should match total stage tasks") {
- val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
+ Seq(true, false).foreach { live =>
+ test(s"Total tasks in the executor summary should match total stage tasks (live = $live)") {
- val listener = new AppStatusListener(store, testConf, true)
+ val testConf = if (live) {
+ conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
+ } else {
+ conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, -1L)
+ }
- val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
- listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
- listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+ val listener = new AppStatusListener(store, testConf, live)
- val tasks = createTasks(4, Array("1", "2"))
- tasks.foreach { task =>
- listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
- }
+ Seq("1", "2").foreach { execId =>
+ listener.onExecutorAdded(SparkListenerExecutorAdded(0L, execId,
+ new ExecutorInfo("host1", 1, Map.empty)))
+ }
+ val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
+ listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
- time += 1
- tasks(0).markFinished(TaskState.FINISHED, time)
- listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
- Success, tasks(0), null))
- time += 1
- tasks(1).markFinished(TaskState.FINISHED, time)
- listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
- Success, tasks(1), null))
+ val tasks = createTasks(4, Array("1", "2"))
+ tasks.foreach { task =>
+ listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+ }
- stage.failureReason = Some("Failed")
- listener.onStageCompleted(SparkListenerStageCompleted(stage))
- time += 1
- listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor"))))
+ time += 1
+ tasks(0).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+ Success, tasks(0), null))
+ time += 1
+ tasks(1).markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+ Success, tasks(1), null))
- time += 1
- tasks(2).markFinished(TaskState.FAILED, time)
- listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
- ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
- time += 1
- tasks(3).markFinished(TaskState.FAILED, time)
- listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
- ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))
-
- val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
- esummary.foreach { execSummary =>
- assert(execSummary.failedTasks === 1)
- assert(execSummary.succeededTasks === 1)
- assert(execSummary.killedTasks === 0)
+ stage.failureReason = Some("Failed")
+ listener.onStageCompleted(SparkListenerStageCompleted(stage))
+ time += 1
+ listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(
+ new RuntimeException("Bad Executor"))))
+
+ time += 1
+ tasks(2).markFinished(TaskState.FAILED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+ ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
+ time += 1
+ tasks(3).markFinished(TaskState.FAILED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+ ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))
+
+ val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
+ esummary.foreach { execSummary =>
+ assert(execSummary.failedTasks === 1)
+ assert(execSummary.succeededTasks === 1)
+ assert(execSummary.killedTasks === 0)
+ }
+
+ val allExecutorSummary = store.view(classOf[ExecutorSummaryWrapper]).asScala.map(_.info)
+ assert(allExecutorSummary.size === 2)
+ allExecutorSummary.foreach { allExecSummary =>
+ assert(allExecSummary.failedTasks === 1)
+ assert(allExecSummary.activeTasks === 0)
+ assert(allExecSummary.completedTasks === 1)
+ }
+ store.delete(classOf[ExecutorSummaryWrapper], "1")
+ store.delete(classOf[ExecutorSummaryWrapper], "2")
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org