You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2015/07/13 20:20:35 UTC

spark git commit: [SPARK-8950] [WEBUI] Correct the calculation of SchedulerDelay in StagePage

Repository: spark
Updated Branches:
  refs/heads/master 9b62e9375 -> 5ca26fb64


[SPARK-8950] [WEBUI] Correct the calculation of SchedulerDelay in StagePage

In StagePage, the SchedulerDelay is calculated as totalExecutionTime - executorRunTime - executorOverhead - gettingResultTime.
But the totalExecutionTime is calculated in the way that doesn't include the gettingResultTime.

Author: Carson Wang <ca...@intel.com>

Closes #7319 from carsonwang/SchedulerDelayTime and squashes the following commits:

f66fb6e [Carson Wang] Update the code style
7d971ae [Carson Wang] Correct the calculation of SchedulerDelay


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ca26fb6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ca26fb6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ca26fb6

Branch: refs/heads/master
Commit: 5ca26fb64de99fa414dc59ce4cf29a0171894793
Parents: 9b62e93
Author: Carson Wang <ca...@intel.com>
Authored: Mon Jul 13 11:20:04 2015 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Mon Jul 13 11:20:04 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/ui/jobs/StagePage.scala    | 45 ++++++++++----------
 1 file changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5ca26fb6/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 60e3c63..ff0a339 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -332,7 +332,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
             </td> +: getFormattedTimeQuantiles(serializationTimes)
 
           val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
-            getGettingResultTime(info).toDouble
+            getGettingResultTime(info, currentTime).toDouble
           }
           val gettingResultQuantiles =
             <td>
@@ -346,7 +346,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
           // machine and to send back the result (but not the time to fetch the task result,
           // if it needed to be fetched from the block manager on the worker).
           val schedulerDelays = validTasks.map { case TaskUIData(info, metrics, _) =>
-            getSchedulerDelay(info, metrics.get).toDouble
+            getSchedulerDelay(info, metrics.get, currentTime).toDouble
           }
           val schedulerDelayTitle = <td><span data-toggle="tooltip"
             title={ToolTips.SCHEDULER_DELAY} data-placement="right">Scheduler Delay</span></td>
@@ -544,7 +544,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
         val serializationTimeProportion = toProportion(serializationTime)
         val deserializationTime = metricsOpt.map(_.executorDeserializeTime).getOrElse(0L)
         val deserializationTimeProportion = toProportion(deserializationTime)
-        val gettingResultTime = getGettingResultTime(taskUIData.taskInfo)
+        val gettingResultTime = getGettingResultTime(taskUIData.taskInfo, currentTime)
         val gettingResultTimeProportion = toProportion(gettingResultTime)
         val schedulerDelay = totalExecutionTime -
           (executorComputingTime + shuffleReadTime + shuffleWriteTime +
@@ -685,11 +685,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
         else metrics.map(_.executorRunTime).getOrElse(1L)
       val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration)
         else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("")
-      val schedulerDelay = metrics.map(getSchedulerDelay(info, _)).getOrElse(0L)
+      val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L)
       val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L)
       val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
       val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
-      val gettingResultTime = getGettingResultTime(info)
+      val gettingResultTime = getGettingResultTime(info, currentTime)
 
       val maybeAccumulators = info.accumulables
       val accumulatorsReadable = maybeAccumulators.map{acc => s"${acc.name}: ${acc.update.get}"}
@@ -852,32 +852,31 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
     <td>{errorSummary}{details}</td>
   }
 
-  private def getGettingResultTime(info: TaskInfo): Long = {
-    if (info.gettingResultTime > 0) {
-      if (info.finishTime > 0) {
+  private def getGettingResultTime(info: TaskInfo, currentTime: Long): Long = {
+    if (info.gettingResult) {
+      if (info.finished) {
         info.finishTime - info.gettingResultTime
       } else {
         // The task is still fetching the result.
-        System.currentTimeMillis - info.gettingResultTime
+        currentTime - info.gettingResultTime
       }
     } else {
       0L
     }
   }
 
-  private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics): Long = {
-    val totalExecutionTime =
-      if (info.gettingResult) {
-        info.gettingResultTime - info.launchTime
-      } else if (info.finished) {
-        info.finishTime - info.launchTime
-      } else {
-        0
-      }
-    val executorOverhead = (metrics.executorDeserializeTime +
-      metrics.resultSerializationTime)
-    math.max(
-      0,
-      totalExecutionTime - metrics.executorRunTime - executorOverhead - getGettingResultTime(info))
+  private def getSchedulerDelay(info: TaskInfo, metrics: TaskMetrics, currentTime: Long): Long = {
+    if (info.finished) {
+      val totalExecutionTime = info.finishTime - info.launchTime
+      val executorOverhead = (metrics.executorDeserializeTime +
+        metrics.resultSerializationTime)
+      math.max(
+        0,
+        totalExecutionTime - metrics.executorRunTime - executorOverhead -
+          getGettingResultTime(info, currentTime))
+    } else {
+      // The task is still running and the metrics like executorRunTime are not available.
+      0L
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org