You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by js...@apache.org on 2017/09/14 12:40:53 UTC

spark git commit: [SPARK-21922] Fix duration always updating when task failed but status is still RUN…

Repository: spark
Updated Branches:
  refs/heads/master 4e6fc6901 -> 4b88393cb


[SPARK-21922] Fix duration always updating when task failed but status is still RUN…

…NING

## What changes were proposed in this pull request?
When driver quit abnormally which cause executor shutdown and task metrics can not be sent to driver for updating.In this case the status will always be 'RUNNING' and the duration on history UI will be 'CurrentTime - launchTime' which increase infinitely.
We can fix this time by modify time of event log since this time has gotten when `FSHistoryProvider` fetch event log from File System.
And the result picture is uploaded in [SPARK-21922](https://issues.apache.org/jira/browse/SPARK-21922).
How to reproduce?
(1) Submit a job to spark on yarn
(2) Mock an oom(or other case can make driver quit abnormally)  senario for driver
(3) Make sure executor is running task when driver quitting
(4) Open the history server and checkout result
It is not a corner case since there are many such jobs in our current cluster.

## How was this patch tested?
Deploy historyserver and open a job has this problem.

Author: zhoukang <zh...@gmail.com>

Closes #19132 from caneGuy/zhoukang/fix-duration.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b88393c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b88393c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b88393c

Branch: refs/heads/master
Commit: 4b88393cb9f4c77f479749edf2377ed6b91280c0
Parents: 4e6fc69
Author: zhoukang <zh...@gmail.com>
Authored: Thu Sep 14 20:40:33 2017 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Thu Sep 14 20:40:33 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +-
 .../org/apache/spark/status/api/v1/AllStagesResource.scala  | 8 +++++---
 .../org/apache/spark/status/api/v1/OneStageResource.scala   | 5 ++++-
 core/src/main/scala/org/apache/spark/ui/SparkUI.scala       | 8 ++++++--
 .../src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 9 +++++++--
 .../src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala | 1 +
 core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala   | 5 +++--
 7 files changed, 27 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 687fd2d..20fe911 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -249,7 +249,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
             val appSecManager = new SecurityManager(conf)
             SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name,
               HistoryServer.getAttemptURI(appId, attempt.attemptId),
-              attempt.startTime)
+              Some(attempt.lastUpdated), attempt.startTime)
             // Do not call ui.bind() to avoid creating a new server for each application
           }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 5602871..4a4ed95 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -47,6 +47,7 @@ private[v1] class AllStagesResource(ui: SparkUI) {
         listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId))
       }
     } yield {
+      stageUiData.lastUpdateTime = ui.lastUpdateTime
       AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false)
     }
   }
@@ -69,7 +70,8 @@ private[v1] object AllStagesResource {
       }
 
     val taskData = if (includeDetails) {
-      Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } )
+      Some(stageUiData.taskData.map { case (k, v) =>
+        k -> convertTaskData(v, stageUiData.lastUpdateTime) })
     } else {
       None
     }
@@ -136,13 +138,13 @@ private[v1] object AllStagesResource {
     }
   }
 
-  def convertTaskData(uiData: TaskUIData): TaskData = {
+  def convertTaskData(uiData: TaskUIData, lastUpdateTime: Option[Long]): TaskData = {
     new TaskData(
       taskId = uiData.taskInfo.taskId,
       index = uiData.taskInfo.index,
       attempt = uiData.taskInfo.attemptNumber,
       launchTime = new Date(uiData.taskInfo.launchTime),
-      duration = uiData.taskDuration,
+      duration = uiData.taskDuration(lastUpdateTime),
       executorId = uiData.taskInfo.executorId,
       host = uiData.taskInfo.host,
       status = uiData.taskInfo.status,

http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
index 3e6d294..f15073b 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala
@@ -35,6 +35,7 @@ private[v1] class OneStageResource(ui: SparkUI) {
   def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = {
     withStage(stageId) { stageAttempts =>
       stageAttempts.map { stage =>
+        stage.ui.lastUpdateTime = ui.lastUpdateTime
         AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui,
           includeDetails = true)
       }
@@ -47,6 +48,7 @@ private[v1] class OneStageResource(ui: SparkUI) {
       @PathParam("stageId") stageId: Int,
       @PathParam("stageAttemptId") stageAttemptId: Int): StageData = {
     withStageAttempt(stageId, stageAttemptId) { stage =>
+      stage.ui.lastUpdateTime = ui.lastUpdateTime
       AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui,
         includeDetails = true)
     }
@@ -81,7 +83,8 @@ private[v1] class OneStageResource(ui: SparkUI) {
       @DefaultValue("20") @QueryParam("length") length: Int,
       @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = {
     withStageAttempt(stageId, stageAttemptId) { stage =>
-      val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq
+      val tasks = stage.ui.taskData.values
+        .map{ AllStagesResource.convertTaskData(_, ui.lastUpdateTime)}.toIndexedSeq
         .sorted(OneStageResource.ordering(sortBy))
       tasks.slice(offset, offset + length)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 589f811..f3fcf27 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -50,6 +50,7 @@ private[spark] class SparkUI private (
     val operationGraphListener: RDDOperationGraphListener,
     var appName: String,
     val basePath: String,
+    val lastUpdateTime: Option[Long] = None,
     val startTime: Long)
   extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf),
     conf, basePath, "SparkUI")
@@ -176,9 +177,11 @@ private[spark] object SparkUI {
       securityManager: SecurityManager,
       appName: String,
       basePath: String,
+      lastUpdateTime: Option[Long],
       startTime: Long): SparkUI = {
     val sparkUI = create(
-      None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
+      None, conf, listenerBus, securityManager, appName, basePath,
+      lastUpdateTime = lastUpdateTime, startTime = startTime)
 
     val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
       Utils.getContextOrSparkClassLoader).asScala
@@ -204,6 +207,7 @@ private[spark] object SparkUI {
       appName: String,
       basePath: String = "",
       jobProgressListener: Option[JobProgressListener] = None,
+      lastUpdateTime: Option[Long] = None,
       startTime: Long): SparkUI = {
 
     val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
@@ -226,6 +230,6 @@ private[spark] object SparkUI {
 
     new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
       executorsListener, _jobProgressListener, storageListener, operationGraphListener,
-      appName, basePath, startTime)
+      appName, basePath, lastUpdateTime, startTime)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 633e740..4d80308 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -299,6 +299,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
           stageData.hasShuffleRead,
           stageData.hasShuffleWrite,
           stageData.hasBytesSpilled,
+          parent.lastUpdateTime,
           currentTime,
           pageSize = taskPageSize,
           sortColumn = taskSortColumn,
@@ -863,6 +864,7 @@ private[ui] class TaskDataSource(
     hasShuffleRead: Boolean,
     hasShuffleWrite: Boolean,
     hasBytesSpilled: Boolean,
+    lastUpdateTime: Option[Long],
     currentTime: Long,
     pageSize: Int,
     sortColumn: String,
@@ -889,8 +891,9 @@ private[ui] class TaskDataSource(
   private def taskRow(taskData: TaskUIData): TaskTableRowData = {
     val info = taskData.taskInfo
     val metrics = taskData.metrics
-    val duration = taskData.taskDuration.getOrElse(1L)
-    val formatDuration = taskData.taskDuration.map(d => UIUtils.formatDuration(d)).getOrElse("")
+    val duration = taskData.taskDuration(lastUpdateTime).getOrElse(1L)
+    val formatDuration =
+      taskData.taskDuration(lastUpdateTime).map(d => UIUtils.formatDuration(d)).getOrElse("")
     val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L)
     val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
     val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
@@ -1154,6 +1157,7 @@ private[ui] class TaskPagedTable(
     hasShuffleRead: Boolean,
     hasShuffleWrite: Boolean,
     hasBytesSpilled: Boolean,
+    lastUpdateTime: Option[Long],
     currentTime: Long,
     pageSize: Int,
     sortColumn: String,
@@ -1179,6 +1183,7 @@ private[ui] class TaskPagedTable(
     hasShuffleRead,
     hasShuffleWrite,
     hasBytesSpilled,
+    lastUpdateTime,
     currentTime,
     pageSize,
     sortColumn,

http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index 799d769..0787ea6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -30,6 +30,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
   val progressListener = parent.jobProgressListener
   val operationGraphListener = parent.operationGraphListener
   val executorsListener = parent.executorsListener
+  val lastUpdateTime = parent.lastUpdateTime
 
   attachPage(new AllStagesPage(this))
   attachPage(new StagePage(this))

http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 9448baa..d9c87f6 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -97,6 +97,7 @@ private[spark] object UIData {
     var memoryBytesSpilled: Long = _
     var diskBytesSpilled: Long = _
     var isBlacklisted: Int = _
+    var lastUpdateTime: Option[Long] = None
 
     var schedulingPool: String = ""
     var description: Option[String] = None
@@ -133,9 +134,9 @@ private[spark] object UIData {
       _metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics)
     }
 
-    def taskDuration: Option[Long] = {
+    def taskDuration(lastUpdateTime: Option[Long] = None): Option[Long] = {
       if (taskInfo.status == "RUNNING") {
-        Some(_taskInfo.timeRunning(System.currentTimeMillis))
+        Some(_taskInfo.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis)))
       } else {
         _metrics.map(_.executorRunTime)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org