You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/18 03:58:52 UTC

git commit: [SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressListener

Repository: spark
Updated Branches:
  refs/heads/master 935fe65ff -> 72e9021ea


[SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressListener

This should reduce memory usage for the web ui as well as slightly increase its speed in draining the UI event queue.

@andrewor14

Author: Reynold Xin <rx...@apache.org>

Closes #1262 from rxin/ui-consolidate-hashtables and squashes the following commits:

1ac3f97 [Reynold Xin] Oops. Properly handle description.
f5736ad [Reynold Xin] Code review comments.
b8828dc [Reynold Xin] Merge branch 'master' into ui-consolidate-hashtables
7a7b6c4 [Reynold Xin] Revert css change.
f959bb8 [Reynold Xin] [SPARK-2299] Consolidate various stageIdTo* hash maps in JobProgressListener to speed it up.
63256f5 [Reynold Xin] [SPARK-2320] Reduce <pre> block font size.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/72e9021e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/72e9021e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/72e9021e

Branch: refs/heads/master
Commit: 72e9021eaf26f31a82120505f8b764b18fbe8d48
Parents: 935fe65
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Jul 17 18:58:48 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Jul 17 18:58:48 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/ui/jobs/ExecutorSummary.scala  |  36 -----
 .../apache/spark/ui/jobs/ExecutorTable.scala    |  29 ++--
 .../spark/ui/jobs/JobProgressListener.scala     | 156 +++++++------------
 .../org/apache/spark/ui/jobs/StagePage.scala    |  37 ++---
 .../org/apache/spark/ui/jobs/StageTable.scala   |  73 ++++-----
 .../scala/org/apache/spark/ui/jobs/UIData.scala |  62 ++++++++
 .../ui/jobs/JobProgressListenerSuite.scala      |  36 +++--
 7 files changed, 205 insertions(+), 224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
deleted file mode 100644
index c4a8996..0000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorSummary.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import org.apache.spark.annotation.DeveloperApi
-
-/**
- * :: DeveloperApi ::
- * Class for reporting aggregated metrics for each executor in stage UI.
- */
-@DeveloperApi
-class ExecutorSummary {
-  var taskTime : Long = 0
-  var failedTasks : Int = 0
-  var succeededTasks : Int = 0
-  var inputBytes: Long = 0
-  var shuffleRead : Long = 0
-  var shuffleWrite : Long = 0
-  var memoryBytesSpilled : Long = 0
-  var diskBytesSpilled : Long = 0
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 5202095..0cc51c8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
 import scala.xml.Node
 
 import org.apache.spark.ui.{ToolTips, UIUtils}
+import org.apache.spark.ui.jobs.UIData.StageUIData
 import org.apache.spark.util.Utils
 
 /** Page showing executor summary */
@@ -64,11 +65,9 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
       executorIdToAddress.put(executorId, address)
     }
 
-    val executorIdToSummary = listener.stageIdToExecutorSummaries.get(stageId)
-    executorIdToSummary match {
-      case Some(x) =>
-        x.toSeq.sortBy(_._1).map { case (k, v) => {
-          // scalastyle:off
+    listener.stageIdToData.get(stageId) match {
+      case Some(stageData: StageUIData) =>
+        stageData.executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
           <tr>
             <td>{k}</td>
             <td>{executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")}</td>
@@ -76,16 +75,20 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
             <td>{v.failedTasks + v.succeededTasks}</td>
             <td>{v.failedTasks}</td>
             <td>{v.succeededTasks}</td>
-            <td sorttable_customekey={v.inputBytes.toString}>{Utils.bytesToString(v.inputBytes)}</td>
-            <td sorttable_customekey={v.shuffleRead.toString}>{Utils.bytesToString(v.shuffleRead)}</td>
-            <td sorttable_customekey={v.shuffleWrite.toString}>{Utils.bytesToString(v.shuffleWrite)}</td>
-            <td sorttable_customekey={v.memoryBytesSpilled.toString} >{Utils.bytesToString(v.memoryBytesSpilled)}</td>
-            <td sorttable_customekey={v.diskBytesSpilled.toString} >{Utils.bytesToString(v.diskBytesSpilled)}</td>
+            <td sorttable_customekey={v.inputBytes.toString}>
+              {Utils.bytesToString(v.inputBytes)}</td>
+            <td sorttable_customekey={v.shuffleRead.toString}>
+              {Utils.bytesToString(v.shuffleRead)}</td>
+            <td sorttable_customekey={v.shuffleWrite.toString}>
+              {Utils.bytesToString(v.shuffleWrite)}</td>
+            <td sorttable_customekey={v.memoryBytesSpilled.toString}>
+              {Utils.bytesToString(v.memoryBytesSpilled)}</td>
+            <td sorttable_customekey={v.diskBytesSpilled.toString}>
+              {Utils.bytesToString(v.diskBytesSpilled)}</td>
           </tr>
-          // scalastyle:on
         }
-      }
-      case _ => Seq[Node]()
+      case None =>
+        Seq.empty[Node]
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index 2286a7f..efb527b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -25,6 +25,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.ui.jobs.UIData._
 
 /**
  * :: DeveloperApi ::
@@ -35,7 +36,7 @@ import org.apache.spark.storage.BlockManagerId
  * updating the internal data structures concurrently.
  */
 @DeveloperApi
-class JobProgressListener(conf: SparkConf) extends SparkListener {
+class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
 
   import JobProgressListener._
 
@@ -46,20 +47,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
   val completedStages = ListBuffer[StageInfo]()
   val failedStages = ListBuffer[StageInfo]()
 
-  // TODO: Should probably consolidate all following into a single hash map.
-  val stageIdToTime = HashMap[Int, Long]()
-  val stageIdToInputBytes = HashMap[Int, Long]()
-  val stageIdToShuffleRead = HashMap[Int, Long]()
-  val stageIdToShuffleWrite = HashMap[Int, Long]()
-  val stageIdToMemoryBytesSpilled = HashMap[Int, Long]()
-  val stageIdToDiskBytesSpilled = HashMap[Int, Long]()
-  val stageIdToTasksActive = HashMap[Int, HashMap[Long, TaskInfo]]()
-  val stageIdToTasksComplete = HashMap[Int, Int]()
-  val stageIdToTasksFailed = HashMap[Int, Int]()
-  val stageIdToTaskData = HashMap[Int, HashMap[Long, TaskUIData]]()
-  val stageIdToExecutorSummaries = HashMap[Int, HashMap[String, ExecutorSummary]]()
-  val stageIdToPool = HashMap[Int, String]()
-  val stageIdToDescription = HashMap[Int, String]()
+  val stageIdToData = new HashMap[Int, StageUIData]
+
   val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
 
   val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
@@ -71,8 +60,12 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
     val stage = stageCompleted.stageInfo
     val stageId = stage.stageId
-    // Remove by stageId, rather than by StageInfo, in case the StageInfo is from storage
-    poolToActiveStages(stageIdToPool(stageId)).remove(stageId)
+    val stageData = stageIdToData.getOrElseUpdate(stageId, {
+      logWarning("Stage completed for unknown stage " + stageId)
+      new StageUIData
+    })
+
+    poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
     activeStages.remove(stageId)
     if (stage.failureReason.isEmpty) {
       completedStages += stage
@@ -87,21 +80,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
   private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
     if (stages.size > retainedStages) {
       val toRemove = math.max(retainedStages / 10, 1)
-      stages.take(toRemove).foreach { s =>
-        stageIdToTime.remove(s.stageId)
-        stageIdToInputBytes.remove(s.stageId)
-        stageIdToShuffleRead.remove(s.stageId)
-        stageIdToShuffleWrite.remove(s.stageId)
-        stageIdToMemoryBytesSpilled.remove(s.stageId)
-        stageIdToDiskBytesSpilled.remove(s.stageId)
-        stageIdToTasksActive.remove(s.stageId)
-        stageIdToTasksComplete.remove(s.stageId)
-        stageIdToTasksFailed.remove(s.stageId)
-        stageIdToTaskData.remove(s.stageId)
-        stageIdToExecutorSummaries.remove(s.stageId)
-        stageIdToPool.remove(s.stageId)
-        stageIdToDescription.remove(s.stageId)
-      }
+      stages.take(toRemove).foreach { s => stageIdToData.remove(s.stageId) }
       stages.trimStart(toRemove)
     }
   }
@@ -114,26 +93,27 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
     val poolName = Option(stageSubmitted.properties).map {
       p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
     }.getOrElse(DEFAULT_POOL_NAME)
-    stageIdToPool(stage.stageId) = poolName
 
-    val description = Option(stageSubmitted.properties).flatMap {
+    val stageData = stageIdToData.getOrElseUpdate(stage.stageId, new StageUIData)
+    stageData.schedulingPool = poolName
+
+    stageData.description = Option(stageSubmitted.properties).flatMap {
       p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
     }
-    description.map(d => stageIdToDescription(stage.stageId) = d)
 
     val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo]())
     stages(stage.stageId) = stage
   }
 
   override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
-    val sid = taskStart.stageId
     val taskInfo = taskStart.taskInfo
     if (taskInfo != null) {
-      val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]())
-      tasksActive(taskInfo.taskId) = taskInfo
-      val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
-      taskMap(taskInfo.taskId) = new TaskUIData(taskInfo)
-      stageIdToTaskData(sid) = taskMap
+      val stageData = stageIdToData.getOrElseUpdate(taskStart.stageId, {
+        logWarning("Task start for unknown stage " + taskStart.stageId)
+        new StageUIData
+      })
+      stageData.numActiveTasks += 1
+      stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo))
     }
   }
 
@@ -143,88 +123,76 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
   }
 
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized {
-    val sid = taskEnd.stageId
     val info = taskEnd.taskInfo
-
     if (info != null) {
+      val stageData = stageIdToData.getOrElseUpdate(taskEnd.stageId, {
+        logWarning("Task end for unknown stage " + taskEnd.stageId)
+        new StageUIData
+      })
+
       // create executor summary map if necessary
-      val executorSummaryMap = stageIdToExecutorSummaries.getOrElseUpdate(key = sid,
-        op = new HashMap[String, ExecutorSummary]())
+      val executorSummaryMap = stageData.executorSummary
       executorSummaryMap.getOrElseUpdate(key = info.executorId, op = new ExecutorSummary)
 
-      val executorSummary = executorSummaryMap.get(info.executorId)
-      executorSummary match {
-        case Some(y) => {
-          // first update failed-task, succeed-task
-          taskEnd.reason match {
-            case Success =>
-              y.succeededTasks += 1
-            case _ =>
-              y.failedTasks += 1
-          }
-
-          // update duration
-          y.taskTime += info.duration
-
-          val metrics = taskEnd.taskMetrics
-          if (metrics != null) {
-            metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
-            metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
-            metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
-            y.memoryBytesSpilled += metrics.memoryBytesSpilled
-            y.diskBytesSpilled += metrics.diskBytesSpilled
-          }
+      executorSummaryMap.get(info.executorId).foreach { y =>
+        // first update failed-task, succeed-task
+        taskEnd.reason match {
+          case Success =>
+            y.succeededTasks += 1
+          case _ =>
+            y.failedTasks += 1
+        }
+
+        // update duration
+        y.taskTime += info.duration
+
+        val metrics = taskEnd.taskMetrics
+        if (metrics != null) {
+          metrics.inputMetrics.foreach { y.inputBytes += _.bytesRead }
+          metrics.shuffleReadMetrics.foreach { y.shuffleRead += _.remoteBytesRead }
+          metrics.shuffleWriteMetrics.foreach { y.shuffleWrite += _.shuffleBytesWritten }
+          y.memoryBytesSpilled += metrics.memoryBytesSpilled
+          y.diskBytesSpilled += metrics.diskBytesSpilled
         }
-        case _ => {}
       }
 
-      val tasksActive = stageIdToTasksActive.getOrElseUpdate(sid, new HashMap[Long, TaskInfo]())
-      // Remove by taskId, rather than by TaskInfo, in case the TaskInfo is from storage
-      tasksActive.remove(info.taskId)
+      stageData.numActiveTasks -= 1
 
       val (errorMessage, metrics): (Option[String], Option[TaskMetrics]) =
         taskEnd.reason match {
           case org.apache.spark.Success =>
-            stageIdToTasksComplete(sid) = stageIdToTasksComplete.getOrElse(sid, 0) + 1
+            stageData.numCompleteTasks += 1
             (None, Option(taskEnd.taskMetrics))
           case e: ExceptionFailure =>  // Handle ExceptionFailure because we might have metrics
-            stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
+            stageData.numFailedTasks += 1
             (Some(e.toErrorString), e.metrics)
           case e: TaskFailedReason =>  // All other failure cases
-            stageIdToTasksFailed(sid) = stageIdToTasksFailed.getOrElse(sid, 0) + 1
+            stageData.numFailedTasks += 1
             (Some(e.toErrorString), None)
         }
 
-      stageIdToTime.getOrElseUpdate(sid, 0L)
-      val time = metrics.map(_.executorRunTime).getOrElse(0L)
-      stageIdToTime(sid) += time
 
-      stageIdToInputBytes.getOrElseUpdate(sid, 0L)
+      val taskRunTime = metrics.map(_.executorRunTime).getOrElse(0L)
+      stageData.executorRunTime += taskRunTime
       val inputBytes = metrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)
-      stageIdToInputBytes(sid) += inputBytes
+      stageData.inputBytes += inputBytes
 
-      stageIdToShuffleRead.getOrElseUpdate(sid, 0L)
       val shuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)
-      stageIdToShuffleRead(sid) += shuffleRead
+      stageData.shuffleReadBytes += shuffleRead
 
-      stageIdToShuffleWrite.getOrElseUpdate(sid, 0L)
       val shuffleWrite =
         metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L)
-      stageIdToShuffleWrite(sid) += shuffleWrite
+      stageData.shuffleWriteBytes += shuffleWrite
 
-      stageIdToMemoryBytesSpilled.getOrElseUpdate(sid, 0L)
       val memoryBytesSpilled = metrics.map(_.memoryBytesSpilled).getOrElse(0L)
-      stageIdToMemoryBytesSpilled(sid) += memoryBytesSpilled
+      stageData.memoryBytesSpilled += memoryBytesSpilled
 
-      stageIdToDiskBytesSpilled.getOrElseUpdate(sid, 0L)
       val diskBytesSpilled = metrics.map(_.diskBytesSpilled).getOrElse(0L)
-      stageIdToDiskBytesSpilled(sid) += diskBytesSpilled
+      stageData.diskBytesSpilled += diskBytesSpilled
 
-      val taskMap = stageIdToTaskData.getOrElse(sid, HashMap[Long, TaskUIData]())
-      taskMap(info.taskId) = new TaskUIData(info, metrics, errorMessage)
-      stageIdToTaskData(sid) = taskMap
+      stageData.taskData(info.taskId) = new TaskUIData(info, metrics, errorMessage)
     }
-  }
+  }  // end of onTaskEnd
 
   override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
     synchronized {
@@ -252,12 +220,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
 
 }
 
-@DeveloperApi
-case class TaskUIData(
-    taskInfo: TaskInfo,
-    taskMetrics: Option[TaskMetrics] = None,
-    errorMessage: Option[String] = None)
-
 private object JobProgressListener {
   val DEFAULT_POOL_NAME = "default"
   val DEFAULT_RETAINED_STAGES = 1000

http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 8c3821b..cab26b9 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -23,6 +23,7 @@ import javax.servlet.http.HttpServletRequest
 import scala.xml.Node
 
 import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
+import org.apache.spark.ui.jobs.UIData._
 import org.apache.spark.util.{Utils, Distribution}
 
 /** Page showing statistics and task list for a given stage */
@@ -34,8 +35,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
   def render(request: HttpServletRequest): Seq[Node] = {
     listener.synchronized {
       val stageId = request.getParameter("id").toInt
+      val stageDataOption = listener.stageIdToData.get(stageId)
 
-      if (!listener.stageIdToTaskData.contains(stageId)) {
+      if (stageDataOption.isEmpty || stageDataOption.get.taskData.isEmpty) {
         val content =
           <div>
             <h4>Summary Metrics</h4> No tasks have started yet
@@ -45,23 +47,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
           "Details for Stage %s".format(stageId), parent.headerTabs, parent)
       }
 
-      val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime)
+      val stageData = stageDataOption.get
+      val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime)
 
       val numCompleted = tasks.count(_.taskInfo.finished)
-      val inputBytes = listener.stageIdToInputBytes.getOrElse(stageId, 0L)
-      val hasInput = inputBytes > 0
-      val shuffleReadBytes = listener.stageIdToShuffleRead.getOrElse(stageId, 0L)
-      val hasShuffleRead = shuffleReadBytes > 0
-      val shuffleWriteBytes = listener.stageIdToShuffleWrite.getOrElse(stageId, 0L)
-      val hasShuffleWrite = shuffleWriteBytes > 0
-      val memoryBytesSpilled = listener.stageIdToMemoryBytesSpilled.getOrElse(stageId, 0L)
-      val diskBytesSpilled = listener.stageIdToDiskBytesSpilled.getOrElse(stageId, 0L)
-      val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0
-
-      var activeTime = 0L
-      val now = System.currentTimeMillis
-      val tasksActive = listener.stageIdToTasksActive(stageId).values
-      tasksActive.foreach(activeTime += _.timeRunning(now))
+      val hasInput = stageData.inputBytes > 0
+      val hasShuffleRead = stageData.shuffleReadBytes > 0
+      val hasShuffleWrite = stageData.shuffleWriteBytes > 0
+      val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0
 
       // scalastyle:off
       val summary =
@@ -69,34 +62,34 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
           <ul class="unstyled">
             <li>
               <strong>Total task time across all tasks: </strong>
-              {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
+              {UIUtils.formatDuration(stageData.executorRunTime)}
             </li>
             {if (hasInput)
               <li>
                 <strong>Input: </strong>
-                {Utils.bytesToString(inputBytes)}
+                {Utils.bytesToString(stageData.inputBytes)}
               </li>
             }
             {if (hasShuffleRead)
               <li>
                 <strong>Shuffle read: </strong>
-                {Utils.bytesToString(shuffleReadBytes)}
+                {Utils.bytesToString(stageData.shuffleReadBytes)}
               </li>
             }
             {if (hasShuffleWrite)
               <li>
                 <strong>Shuffle write: </strong>
-                {Utils.bytesToString(shuffleWriteBytes)}
+                {Utils.bytesToString(stageData.shuffleWriteBytes)}
               </li>
             }
             {if (hasBytesSpilled)
             <li>
               <strong>Shuffle spill (memory): </strong>
-              {Utils.bytesToString(memoryBytesSpilled)}
+              {Utils.bytesToString(stageData.memoryBytesSpilled)}
             </li>
             <li>
               <strong>Shuffle spill (disk): </strong>
-              {Utils.bytesToString(diskBytesSpilled)}
+              {Utils.bytesToString(stageData.diskBytesSpilled)}
             </li>
             }
           </ul>

http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index fd8d0b5..5f45c0c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.ui.jobs
 
-import java.util.Date
-
-import scala.collection.mutable.HashMap
 import scala.xml.Node
 
-import org.apache.spark.scheduler.{StageInfo, TaskInfo}
+import java.util.Date
+
+import org.apache.spark.scheduler.StageInfo
 import org.apache.spark.ui.{ToolTips, UIUtils}
 import org.apache.spark.util.Utils
 
@@ -71,14 +70,14 @@ private[ui] class StageTableBase(
     </table>
   }
 
-  private def makeProgressBar(started: Int, completed: Int, failed: String, total: Int): Seq[Node] =
+  private def makeProgressBar(started: Int, completed: Int, failed: Int, total: Int): Seq[Node] =
   {
     val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
     val startWidth = "width: %s%%".format((started.toDouble/total)*100)
 
     <div class="progress">
       <span style="text-align:center; position:absolute; width:100%; left:0;">
-        {completed}/{total} {failed}
+        {completed}/{total} { if (failed > 0) s"($failed failed)" else "" }
       </span>
       <div class="bar bar-completed" style={completeWidth}></div>
       <div class="bar bar-running" style={startWidth}></div>
@@ -108,13 +107,23 @@ private[ui] class StageTableBase(
       <pre class="stage-details collapsed">{s.details}</pre>
     }
 
-    listener.stageIdToDescription.get(s.stageId)
-      .map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
-      .getOrElse(<div>{nameLink} {killLink} {details}</div>)
+    val stageDataOption = listener.stageIdToData.get(s.stageId)
+    // Too many nested map/flatMaps with options are just annoying to read. Do this imperatively.
+    if (stageDataOption.isDefined && stageDataOption.get.description.isDefined) {
+      val desc = stageDataOption.get.description
+      <div><em>{desc}</em></div><div>{nameLink} {killLink}</div>
+    } else {
+      <div>{killLink} {nameLink} {details}</div>
+    }
   }
 
   protected def stageRow(s: StageInfo): Seq[Node] = {
-    val poolName = listener.stageIdToPool.get(s.stageId)
+    val stageDataOption = listener.stageIdToData.get(s.stageId)
+    if (stageDataOption.isEmpty) {
+      return <td>{s.stageId}</td><td>No data available for this stage</td>
+    }
+
+    val stageData = stageDataOption.get
     val submissionTime = s.submissionTime match {
       case Some(t) => UIUtils.formatDate(new Date(t))
       case None => "Unknown"
@@ -124,35 +133,20 @@ private[ui] class StageTableBase(
       if (finishTime > t) finishTime - t else System.currentTimeMillis - t
     }
     val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
-    val startedTasks =
-      listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size
-    val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0)
-    val failedTasks = listener.stageIdToTasksFailed.getOrElse(s.stageId, 0) match {
-      case f if f > 0 => "(%s failed)".format(f)
-      case _ => ""
-    }
-    val totalTasks = s.numTasks
-    val inputSortable = listener.stageIdToInputBytes.getOrElse(s.stageId, 0L)
-    val inputRead = inputSortable match {
-      case 0 => ""
-      case b => Utils.bytesToString(b)
-    }
-    val shuffleReadSortable = listener.stageIdToShuffleRead.getOrElse(s.stageId, 0L)
-    val shuffleRead = shuffleReadSortable match {
-      case 0 => ""
-      case b => Utils.bytesToString(b)
-    }
-    val shuffleWriteSortable = listener.stageIdToShuffleWrite.getOrElse(s.stageId, 0L)
-    val shuffleWrite = shuffleWriteSortable match {
-      case 0 => ""
-      case b => Utils.bytesToString(b)
-    }
+
+    val inputRead = stageData.inputBytes
+    val inputReadWithUnit = if (inputRead > 0) Utils.bytesToString(inputRead) else ""
+    val shuffleRead = stageData.shuffleReadBytes
+    val shuffleReadWithUnit = if (shuffleRead > 0) Utils.bytesToString(shuffleRead) else ""
+    val shuffleWrite = stageData.shuffleWriteBytes
+    val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""
+
     <td>{s.stageId}</td> ++
     {if (isFairScheduler) {
       <td>
         <a href={"%s/stages/pool?poolname=%s"
-          .format(UIUtils.prependBaseUri(basePath), poolName.get)}>
-          {poolName.get}
+          .format(UIUtils.prependBaseUri(basePath), stageData.schedulingPool)}>
+          {stageData.schedulingPool}
         </a>
       </td>
     } else {
@@ -162,11 +156,12 @@ private[ui] class StageTableBase(
     <td valign="middle">{submissionTime}</td>
     <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
     <td class="progress-cell">
-      {makeProgressBar(startedTasks, completedTasks, failedTasks, totalTasks)}
+      {makeProgressBar(stageData.numActiveTasks, stageData.numCompleteTasks,
+        stageData.numFailedTasks, s.numTasks)}
     </td>
-    <td sorttable_customekey={inputSortable.toString}>{inputRead}</td>
-    <td sorttable_customekey={shuffleReadSortable.toString}>{shuffleRead}</td>
-    <td sorttable_customekey={shuffleWriteSortable.toString}>{shuffleWrite}</td>
+    <td sorttable_customekey={inputRead.toString}>{inputReadWithUnit}</td>
+    <td sorttable_customekey={shuffleRead.toString}>{shuffleReadWithUnit}</td>
+    <td sorttable_customekey={shuffleWrite.toString}>{shuffleWriteWithUnit}</td>
   }
 
   /** Render an HTML row that represents a stage */

http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
new file mode 100644
index 0000000..be11a11
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.TaskInfo
+
+import scala.collection.mutable.HashMap
+
+private[jobs] object UIData {
+
+  class ExecutorSummary {
+    var taskTime : Long = 0
+    var failedTasks : Int = 0
+    var succeededTasks : Int = 0
+    var inputBytes : Long = 0
+    var shuffleRead : Long = 0
+    var shuffleWrite : Long = 0
+    var memoryBytesSpilled : Long = 0
+    var diskBytesSpilled : Long = 0
+  }
+
+  class StageUIData {
+    var numActiveTasks: Int = _
+    var numCompleteTasks: Int = _
+    var numFailedTasks: Int = _
+
+    var executorRunTime: Long = _
+
+    var inputBytes: Long = _
+    var shuffleReadBytes: Long = _
+    var shuffleWriteBytes: Long = _
+    var memoryBytesSpilled: Long = _
+    var diskBytesSpilled: Long = _
+
+    var schedulingPool: String = ""
+    var description: Option[String] = None
+
+    var taskData = new HashMap[Long, TaskUIData]
+    var executorSummary = new HashMap[String, ExecutorSummary]
+  }
+
+  case class TaskUIData(
+      taskInfo: TaskInfo,
+      taskMetrics: Option[TaskMetrics] = None,
+      errorMessage: Option[String] = None)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/72e9021e/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index fa43b66..a855662 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -47,11 +47,11 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     }
 
     listener.completedStages.size should be (5)
-    listener.completedStages.filter(_.stageId == 50).size should be (1)
-    listener.completedStages.filter(_.stageId == 49).size should be (1)
-    listener.completedStages.filter(_.stageId == 48).size should be (1)
-    listener.completedStages.filter(_.stageId == 47).size should be (1)
-    listener.completedStages.filter(_.stageId == 46).size should be (1)
+    listener.completedStages.count(_.stageId == 50) should be (1)
+    listener.completedStages.count(_.stageId == 49) should be (1)
+    listener.completedStages.count(_.stageId == 48) should be (1)
+    listener.completedStages.count(_.stageId == 47) should be (1)
+    listener.completedStages.count(_.stageId == 46) should be (1)
   }
 
   test("test executor id to summary") {
@@ -59,9 +59,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     val listener = new JobProgressListener(conf)
     val taskMetrics = new TaskMetrics()
     val shuffleReadMetrics = new ShuffleReadMetrics()
-
-    // nothing in it
-    assert(listener.stageIdToExecutorSummaries.size == 0)
+    assert(listener.stageIdToData.size === 0)
 
     // finish this task, should get updated shuffleRead
     shuffleReadMetrics.remoteBytesRead = 1000
@@ -71,8 +69,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     var task = new ShuffleMapTask(0, null, null, 0, null)
     val taskType = Utils.getFormattedClassName(task)
     listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
-    assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
-      .shuffleRead == 1000)
+    assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
+      .shuffleRead === 1000)
 
     // finish a task with unknown executor-id, nothing should happen
     taskInfo =
@@ -80,7 +78,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0, null, null, 0, null)
     listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
-    assert(listener.stageIdToExecutorSummaries.size == 1)
+    assert(listener.stageIdToData.size === 1)
 
     // finish this task, should get updated duration
     shuffleReadMetrics.remoteBytesRead = 1000
@@ -89,8 +87,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0, null, null, 0, null)
     listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
-    assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-1", fail())
-      .shuffleRead == 2000)
+    assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-1", fail())
+      .shuffleRead === 2000)
 
     // finish this task, should get updated duration
     shuffleReadMetrics.remoteBytesRead = 1000
@@ -99,8 +97,8 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     taskInfo.finishTime = 1
     task = new ShuffleMapTask(0, null, null, 0, null)
     listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, taskMetrics))
-    assert(listener.stageIdToExecutorSummaries.getOrElse(0, fail()).getOrElse("exe-2", fail())
-      .shuffleRead == 1000)
+    assert(listener.stageIdToData.getOrElse(0, fail()).executorSummary.getOrElse("exe-2", fail())
+      .shuffleRead === 1000)
   }
 
   test("test task success vs failure counting for different task end reasons") {
@@ -121,13 +119,17 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
       TaskKilled,
       ExecutorLostFailure,
       UnknownReason)
+    var failCount = 0
     for (reason <- taskFailedReasons) {
       listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, reason, taskInfo, metrics))
-      assert(listener.stageIdToTasksComplete.get(task.stageId) === None)
+      failCount += 1
+      assert(listener.stageIdToData(task.stageId).numCompleteTasks === 0)
+      assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
     }
 
     // Make sure we count success as success.
     listener.onTaskEnd(SparkListenerTaskEnd(task.stageId, taskType, Success, taskInfo, metrics))
-    assert(listener.stageIdToTasksComplete.get(task.stageId) === Some(1))
+    assert(listener.stageIdToData(task.stageId).numCompleteTasks === 1)
+    assert(listener.stageIdToData(task.stageId).numFailedTasks === failCount)
   }
 }