You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/12/03 23:15:18 UTC

spark git commit: [SPARK-26219][CORE][BRANCH-2.4] Executor summary should get updated for failure jobs in the history server UI

Repository: spark
Updated Branches:
  refs/heads/branch-2.4 349e25bd4 -> 90fcd12af


[SPARK-26219][CORE][BRANCH-2.4] Executor summary should get updated for failure jobs in the history server UI

Back port the commit https://github.com/apache/spark/pull/23181 into Spark2.4 branch

Added UT

Closes #23191 from shahidki31/branch-2.4.

Authored-by: Shahid <sh...@gmail.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90fcd12a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90fcd12a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90fcd12a

Branch: refs/heads/branch-2.4
Commit: 90fcd12af936792a99738789ba1eeb9a1e7e3ce1
Parents: 349e25b
Author: Shahid <sh...@gmail.com>
Authored: Mon Dec 3 15:11:43 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Mon Dec 3 15:12:03 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/status/AppStatusListener.scala | 19 ++--
 .../spark/status/AppStatusListenerSuite.scala   | 94 ++++++++++++--------
 2 files changed, 66 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90fcd12a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index e6f0d08..5b564ef 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -599,9 +599,14 @@ private[spark] class AppStatusListener(
         }
       }
 
-      // Force an update on live applications when the number of active tasks reaches 0. This is
-      // checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
-      conditionalLiveUpdate(exec, now, exec.activeTasks == 0)
+      // Force an update on both live and history applications when the number of active tasks
+      // reaches 0. This is checked in some tests (e.g. SQLTestUtilsBase) so it needs to be
+      // reliably up to date.
+      if (exec.activeTasks == 0) {
+        update(exec, now)
+      } else {
+        maybeUpdate(exec, now)
+      }
     }
   }
 
@@ -954,14 +959,6 @@ private[spark] class AppStatusListener(
     }
   }
 
-  private def conditionalLiveUpdate(entity: LiveEntity, now: Long, condition: Boolean): Unit = {
-    if (condition) {
-      liveUpdate(entity, now)
-    } else {
-      maybeUpdate(entity, now)
-    }
-  }
-
   private def cleanupExecutors(count: Long): Unit = {
     // Because the limit is on the number of *dead* executors, we need to calculate whether
     // there are actually enough dead executors to be deleted.

http://git-wip-us.apache.org/repos/asf/spark/blob/90fcd12a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index b6ddbe0..f34be48 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -1274,48 +1274,70 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     assert(allJobs.head.numFailedStages == 1)
   }
 
-  test("SPARK-25451: total tasks in the executor summary should match total stage tasks") {
-    val testConf = conf.clone.set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
+  Seq(true, false).foreach { live =>
+    test(s"Total tasks in the executor summary should match total stage tasks (live = $live)") {
 
-    val listener = new AppStatusListener(store, testConf, true)
+      val testConf = if (live) {
+        conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, Long.MaxValue)
+      } else {
+        conf.clone().set(LIVE_ENTITY_UPDATE_PERIOD, -1L)
+      }
 
-    val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
-    listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
-    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+      val listener = new AppStatusListener(store, testConf, live)
 
-    val tasks = createTasks(4, Array("1", "2"))
-    tasks.foreach { task =>
-      listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
-    }
+      Seq("1", "2").foreach { execId =>
+        listener.onExecutorAdded(SparkListenerExecutorAdded(0L, execId,
+          new ExecutorInfo("host1", 1, Map.empty)))
+      }
+      val stage = new StageInfo(1, 0, "stage", 4, Nil, Nil, "details")
+      listener.onJobStart(SparkListenerJobStart(1, time, Seq(stage), null))
+      listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
 
-    time += 1
-    tasks(0).markFinished(TaskState.FINISHED, time)
-    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
-      Success, tasks(0), null))
-    time += 1
-    tasks(1).markFinished(TaskState.FINISHED, time)
-    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
-      Success, tasks(1), null))
+      val tasks = createTasks(4, Array("1", "2"))
+      tasks.foreach { task =>
+        listener.onTaskStart(SparkListenerTaskStart(stage.stageId, stage.attemptNumber, task))
+      }
 
-    stage.failureReason = Some("Failed")
-    listener.onStageCompleted(SparkListenerStageCompleted(stage))
-    time += 1
-    listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(new RuntimeException("Bad Executor"))))
+      time += 1
+      tasks(0).markFinished(TaskState.FINISHED, time)
+      listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+        Success, tasks(0), null))
+      time += 1
+      tasks(1).markFinished(TaskState.FINISHED, time)
+      listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+        Success, tasks(1), null))
 
-    time += 1
-    tasks(2).markFinished(TaskState.FAILED, time)
-    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
-      ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
-    time += 1
-    tasks(3).markFinished(TaskState.FAILED, time)
-    listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptId, "taskType",
-      ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))
-
-    val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
-    esummary.foreach { execSummary =>
-      assert(execSummary.failedTasks === 1)
-      assert(execSummary.succeededTasks === 1)
-      assert(execSummary.killedTasks === 0)
+      stage.failureReason = Some("Failed")
+      listener.onStageCompleted(SparkListenerStageCompleted(stage))
+      time += 1
+      listener.onJobEnd(SparkListenerJobEnd(1, time, JobFailed(
+        new RuntimeException("Bad Executor"))))
+
+      time += 1
+      tasks(2).markFinished(TaskState.FAILED, time)
+      listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+        ExecutorLostFailure("1", true, Some("Lost executor")), tasks(2), null))
+      time += 1
+      tasks(3).markFinished(TaskState.FAILED, time)
+      listener.onTaskEnd(SparkListenerTaskEnd(stage.stageId, stage.attemptNumber, "taskType",
+        ExecutorLostFailure("2", true, Some("Lost executor")), tasks(3), null))
+
+      val esummary = store.view(classOf[ExecutorStageSummaryWrapper]).asScala.map(_.info)
+      esummary.foreach { execSummary =>
+        assert(execSummary.failedTasks === 1)
+        assert(execSummary.succeededTasks === 1)
+        assert(execSummary.killedTasks === 0)
+      }
+
+      val allExecutorSummary = store.view(classOf[ExecutorSummaryWrapper]).asScala.map(_.info)
+      assert(allExecutorSummary.size === 2)
+      allExecutorSummary.foreach { allExecSummary =>
+        assert(allExecSummary.failedTasks === 1)
+        assert(allExecSummary.activeTasks === 0)
+        assert(allExecSummary.completedTasks === 1)
+      }
+      store.delete(classOf[ExecutorSummaryWrapper], "1")
+      store.delete(classOf[ExecutorSummaryWrapper], "2")
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org