You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/06 20:03:34 UTC

[1/3] git commit: Added logging of scheduler delays to UI

Updated Branches:
  refs/heads/master 87676a6af -> 3fb302c08


Added logging of scheduler delays to UI


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/fc78f67d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/fc78f67d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/fc78f67d

Branch: refs/heads/master
Commit: fc78f67da2fd28744e8119e28f4bb8a29926b3ad
Parents: 2fead51
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Thu Nov 21 16:54:23 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Thu Nov 21 16:54:23 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/ui/jobs/StagePage.scala    | 33 ++++++++++++++++++--
 1 file changed, 31 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fc78f67d/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index fbd8228..fc8c334 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -60,11 +60,13 @@ private[spark] class StagePage(parent: JobProgressUI) {
       var activeTime = 0L
       listener.stageIdToTasksActive(stageId).foreach(activeTime += _.timeRunning(now))
 
+      val finishedTasks = listener.stageIdToTaskInfos(stageId).filter(_._1.finished)
+
       val summary =
         <div>
           <ul class="unstyled">
             <li>
-              <strong>CPU time: </strong>
+              <strong>Total duration across all tasks: </strong>
               {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
             </li>
             {if (hasShuffleRead)
@@ -104,6 +106,30 @@ private[spark] class StagePage(parent: JobProgressUI) {
           val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
             ms => parent.formatDuration(ms.toLong))
 
+          val gettingResultTimes = validTasks.map{case (info, metrics, exception) =>
+            if (info.gettingResultTime > 0) {
+              (info.finishTime - info.gettingResultTime).toDouble
+            } else {
+              0.0
+            }
+          }
+          val gettingResultQuantiles = ("Time spent fetching task results" +:
+            Distribution(gettingResultTimes).get.getQuantiles().map(
+              millis => parent.formatDuration(millis.toLong)))
+          // The scheduler delay includes the network delay to send the task to the worker
+          // machine and to send back the result (but not the time to fetch the task result,
+          // if it needed to be fetched from the block manager on the worker).
+          val schedulerDelays = validTasks.map{case (info, metrics, exception) =>
+            if (info.gettingResultTime > 0) {
+              (info.gettingResultTime - info.launchTime).toDouble
+            } else {
+              (info.finishTime - info.launchTime).toDouble
+            }
+          }
+          val schedulerDelayQuantiles = ("Scheduler delay" +:
+            Distribution(schedulerDelays).get.getQuantiles().map(
+              millis => parent.formatDuration(millis.toLong)))
+
           def getQuantileCols(data: Seq[Double]) =
             Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
 
@@ -119,7 +145,10 @@ private[spark] class StagePage(parent: JobProgressUI) {
           }
           val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
 
-          val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
+          val listings: Seq[Seq[String]] = Seq(
+            serviceQuantiles,
+            gettingResultQuantiles,
+            schedulerDelayQuantiles,
             if (hasShuffleRead) shuffleReadQuantiles else Nil,
             if (hasShuffleWrite) shuffleWriteQuantiles else Nil)
 


[3/3] git commit: Merge pull request #205 from kayousterhout/logging

Posted by ma...@apache.org.
Merge pull request #205 from kayousterhout/logging

Added logging of scheduler delays to UI

This commit adds two metrics to the UI:

1) The time to get task results, if they're fetched remotely

2) The scheduler delay.  When the scheduler starts getting overwhelmed (because it can't keep up with the rate at which tasks are being submitted), the result is that tasks get delayed on the tail-end: the message from the worker saying that the task has completed ends up in a long queue and takes a while to be processed by the scheduler.  This commit records that delay in the UI so that users can tell when the scheduler is becoming the bottleneck.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/3fb302c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/3fb302c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/3fb302c0

Branch: refs/heads/master
Commit: 3fb302c08d078decd1fa7dd0fc008faff132ab7f
Parents: 87676a6 58b3aff
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Fri Dec 6 11:03:32 2013 -0800
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Fri Dec 6 11:03:32 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/ui/jobs/StagePage.scala    | 36 ++++++++++++++++++--
 1 file changed, 34 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/3fb302c0/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------


[2/3] git commit: Fixed problem with scheduler delay

Posted by ma...@apache.org.
Fixed problem with scheduler delay


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/58b3aff9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/58b3aff9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/58b3aff9

Branch: refs/heads/master
Commit: 58b3aff9a871a38446aacc2d60b65199d44e56bb
Parents: fc78f67
Author: Kay Ousterhout <ka...@gmail.com>
Authored: Mon Dec 2 20:30:03 2013 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Mon Dec 2 20:30:03 2013 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/ui/jobs/StagePage.scala  | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/58b3aff9/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index fc8c334..8deb495 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -120,11 +120,14 @@ private[spark] class StagePage(parent: JobProgressUI) {
           // machine and to send back the result (but not the time to fetch the task result,
           // if it needed to be fetched from the block manager on the worker).
           val schedulerDelays = validTasks.map{case (info, metrics, exception) =>
-            if (info.gettingResultTime > 0) {
-              (info.gettingResultTime - info.launchTime).toDouble
-            } else {
-              (info.finishTime - info.launchTime).toDouble
+            val totalExecutionTime = {
+              if (info.gettingResultTime > 0) {
+                (info.gettingResultTime - info.launchTime).toDouble
+              } else {
+                (info.finishTime - info.launchTime).toDouble
+              }
             }
+            totalExecutionTime - metrics.get.executorRunTime
           }
           val schedulerDelayQuantiles = ("Scheduler delay" +:
             Distribution(schedulerDelays).get.getQuantiles().map(