You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/11 11:42:09 UTC

[2/2] spark git commit: [SPARK-20657][CORE] Speed up rendering of the stages page.

[SPARK-20657][CORE] Speed up rendering of the stages page.

There are two main changes to speed up rendering of the tasks list
when rendering the stage page.

The first one makes the code only load the tasks being shown in the
current page of the tasks table, and information related to only
those tasks. One side-effect of this change is that the graph that
shows task-related events now only shows events for the tasks in
the current page, instead of the previously hardcoded limit of "events
for the first 1000 tasks". That ends up helping with readability,
though.

To make sorting efficient when using a disk store, the task wrapper
was extended to include many new indices, one for each of the sortable
columns in the UI, and metrics for which quantiles are calculated.

The second changes the way metric quantiles are calculated for stages.
Instead of using the "Distribution" class to process data for all task
metrics, which requires scanning all tasks of a stage, the code now
uses the KVStore "skip()" functionality to only read tasks that contain
interesting information for the quantiles that are desired.

This is still not cheap; because there are many metrics that the UI
and API track, the code needs to scan the index for each metric to
gather the information. Savings come mainly from skipping deserialization
when using the disk store, but the in-memory code also seems to be
faster than before (most probably because of other changes in this
patch).

To make subsequent calls faster, some quantiles are cached in the
status store. This makes UIs much faster after the first time a stage
has been loaded.

With the above changes, a lot of code in the UI layer could be simplified.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #20013 from vanzin/SPARK-20657.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c70da3b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c70da3b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c70da3b

Branch: refs/heads/master
Commit: 1c70da3bfbb4016e394de2c73eb0db7cdd9a6968
Parents: 87c98de
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Thu Jan 11 19:41:48 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jan 11 19:41:48 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/util/kvstore/LevelDB.java  |   1 +
 .../apache/spark/status/AppStatusListener.scala |  57 +-
 .../apache/spark/status/AppStatusStore.scala    | 389 +++++---
 .../apache/spark/status/AppStatusUtils.scala    |  68 ++
 .../org/apache/spark/status/LiveEntity.scala    | 344 ++++---
 .../spark/status/api/v1/StagesResource.scala    |   3 +-
 .../org/apache/spark/status/api/v1/api.scala    |   3 +
 .../org/apache/spark/status/storeTypes.scala    | 327 ++++++-
 .../apache/spark/ui/jobs/ExecutorTable.scala    |   4 +-
 .../org/apache/spark/ui/jobs/JobPage.scala      |   2 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    | 919 ++++++-------------
 ...summary_w__custom_quantiles_expectation.json |   3 +
 ...task_summary_w_shuffle_read_expectation.json |   3 +
 ...ask_summary_w_shuffle_write_expectation.json |   3 +
 .../spark/status/AppStatusListenerSuite.scala   | 105 ++-
 .../spark/status/AppStatusStoreSuite.scala      | 104 +++
 .../org/apache/spark/ui/StagePageSuite.scala    |  10 +-
 scalastyle-config.xml                           |   2 +-
 18 files changed, 1361 insertions(+), 986 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index 4f9e10c..0e491ef 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -83,6 +83,7 @@ public class LevelDB implements KVStore {
     if (versionData != null) {
       long version = serializer.deserializeLong(versionData);
       if (version != STORE_VERSION) {
+        close();
         throw new UnsupportedStoreVersionException();
       }
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 88b75dd..b4edcf2 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -377,6 +377,10 @@ private[spark] class AppStatusListener(
     Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
       stage.activeTasks += 1
       stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime)
+
+      val locality = event.taskInfo.taskLocality.toString()
+      val count = stage.localitySummary.getOrElse(locality, 0L) + 1L
+      stage.localitySummary = stage.localitySummary ++ Map(locality -> count)
       maybeUpdate(stage, now)
 
       stage.jobs.foreach { job =>
@@ -433,7 +437,7 @@ private[spark] class AppStatusListener(
       }
       task.errorMessage = errorMessage
       val delta = task.updateMetrics(event.taskMetrics)
-      update(task, now)
+      update(task, now, last = true)
       delta
     }.orNull
 
@@ -450,7 +454,7 @@ private[spark] class AppStatusListener(
 
     Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
       if (metricsDelta != null) {
-        stage.metrics.update(metricsDelta)
+        stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, metricsDelta)
       }
       stage.activeTasks -= 1
       stage.completedTasks += completedDelta
@@ -486,7 +490,7 @@ private[spark] class AppStatusListener(
       esummary.failedTasks += failedDelta
       esummary.killedTasks += killedDelta
       if (metricsDelta != null) {
-        esummary.metrics.update(metricsDelta)
+        esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, metricsDelta)
       }
       maybeUpdate(esummary, now)
 
@@ -604,11 +608,11 @@ private[spark] class AppStatusListener(
         maybeUpdate(task, now)
 
         Option(liveStages.get((sid, sAttempt))).foreach { stage =>
-          stage.metrics.update(delta)
+          stage.metrics = LiveEntityHelpers.addMetrics(stage.metrics, delta)
           maybeUpdate(stage, now)
 
           val esummary = stage.executorSummary(event.execId)
-          esummary.metrics.update(delta)
+          esummary.metrics = LiveEntityHelpers.addMetrics(esummary.metrics, delta)
           maybeUpdate(esummary, now)
         }
       }
@@ -690,7 +694,7 @@ private[spark] class AppStatusListener(
     // can update the executor information too.
     liveRDDs.get(block.rddId).foreach { rdd =>
       if (updatedStorageLevel.isDefined) {
-        rdd.storageLevel = updatedStorageLevel.get
+        rdd.setStorageLevel(updatedStorageLevel.get)
       }
 
       val partition = rdd.partition(block.name)
@@ -814,7 +818,7 @@ private[spark] class AppStatusListener(
 
   /** Update a live entity only if it hasn't been updated in the last configured period. */
   private def maybeUpdate(entity: LiveEntity, now: Long): Unit = {
-    if (liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) {
+    if (live && liveUpdatePeriodNs >= 0 && now - entity.lastWriteTime > liveUpdatePeriodNs) {
       update(entity, now)
     }
   }
@@ -865,7 +869,7 @@ private[spark] class AppStatusListener(
     }
 
     stages.foreach { s =>
-      val key = s.id
+      val key = Array(s.info.stageId, s.info.attemptId)
       kvstore.delete(s.getClass(), key)
 
       val execSummaries = kvstore.view(classOf[ExecutorStageSummaryWrapper])
@@ -885,15 +889,15 @@ private[spark] class AppStatusListener(
         .asScala
 
       tasks.foreach { t =>
-        kvstore.delete(t.getClass(), t.info.taskId)
+        kvstore.delete(t.getClass(), t.taskId)
       }
 
       // Check whether there are remaining attempts for the same stage. If there aren't, then
       // also delete the RDD graph data.
       val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
         .index("stageId")
-        .first(s.stageId)
-        .last(s.stageId)
+        .first(s.info.stageId)
+        .last(s.info.stageId)
         .closeableIterator()
 
       val hasMoreAttempts = try {
@@ -905,8 +909,10 @@ private[spark] class AppStatusListener(
       }
 
       if (!hasMoreAttempts) {
-        kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId)
+        kvstore.delete(classOf[RDDOperationGraphWrapper], s.info.stageId)
       }
+
+      cleanupCachedQuantiles(key)
     }
   }
 
@@ -919,9 +925,9 @@ private[spark] class AppStatusListener(
 
       // Try to delete finished tasks only.
       val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
-        !live || t.info.status != TaskState.RUNNING.toString()
+        !live || t.status != TaskState.RUNNING.toString()
       }
-      toDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
+      toDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) }
       stage.savedTasks.addAndGet(-toDelete.size)
 
       // If there are more running tasks than the configured limit, delete running tasks. This
@@ -930,13 +936,34 @@ private[spark] class AppStatusListener(
       val remaining = countToDelete - toDelete.size
       if (remaining > 0) {
         val runningTasksToDelete = view.max(remaining).iterator().asScala.toList
-        runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
+        runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.taskId) }
         stage.savedTasks.addAndGet(-remaining)
       }
+
+      // On live applications, cleanup any cached quantiles for the stage. This makes sure that
+      // quantiles will be recalculated after tasks are replaced with newer ones.
+      //
+      // This is not needed in the SHS since caching only happens after the event logs are
+      // completely processed.
+      if (live) {
+        cleanupCachedQuantiles(stageKey)
+      }
     }
     stage.cleaning = false
   }
 
+  private def cleanupCachedQuantiles(stageKey: Array[Int]): Unit = {
+    val cachedQuantiles = kvstore.view(classOf[CachedQuantile])
+      .index("stage")
+      .first(stageKey)
+      .last(stageKey)
+      .asScala
+      .toList
+    cachedQuantiles.foreach { q =>
+      kvstore.delete(q.getClass(), q.id)
+    }
+  }
+
   /**
    * Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
    * asynchronously, this method may return 0 in case enough items have been deleted already.

http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 5a942f5..efc2853 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.{JobExecutionStatus, SparkConf}
 import org.apache.spark.status.api.v1
 import org.apache.spark.ui.scope._
-import org.apache.spark.util.Distribution
+import org.apache.spark.util.{Distribution, Utils}
 import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
 
 /**
@@ -98,7 +98,11 @@ private[spark] class AppStatusStore(
     val it = store.view(classOf[StageDataWrapper]).index("stageId").reverse().first(stageId)
       .closeableIterator()
     try {
-      it.next().info
+      if (it.hasNext()) {
+        it.next().info
+      } else {
+        throw new NoSuchElementException(s"No stage with id $stageId")
+      }
     } finally {
       it.close()
     }
@@ -110,107 +114,238 @@ private[spark] class AppStatusStore(
     if (details) stageWithDetails(stage) else stage
   }
 
+  def taskCount(stageId: Int, stageAttemptId: Int): Long = {
+    store.count(classOf[TaskDataWrapper], "stage", Array(stageId, stageAttemptId))
+  }
+
+  def localitySummary(stageId: Int, stageAttemptId: Int): Map[String, Long] = {
+    store.read(classOf[StageDataWrapper], Array(stageId, stageAttemptId)).locality
+  }
+
+  /**
+   * Calculates a summary of the task metrics for the given stage attempt, returning the
+   * requested quantiles for the recorded metrics.
+   *
+   * This method can be expensive if the requested quantiles are not cached; the method
+   * will only cache certain quantiles (every 0.05 step), so it's recommended to stick to
+   * those to avoid expensive scans of all task data.
+   */
   def taskSummary(
       stageId: Int,
       stageAttemptId: Int,
-      quantiles: Array[Double]): v1.TaskMetricDistributions = {
-
-    val stage = Array(stageId, stageAttemptId)
-
-    val rawMetrics = store.view(classOf[TaskDataWrapper])
-      .index("stage")
-      .first(stage)
-      .last(stage)
-      .asScala
-      .flatMap(_.info.taskMetrics)
-      .toList
-      .view
-
-    def metricQuantiles(f: v1.TaskMetrics => Double): IndexedSeq[Double] =
-      Distribution(rawMetrics.map { d => f(d) }).get.getQuantiles(quantiles)
-
-    // We need to do a lot of similar munging to nested metrics here.  For each one,
-    // we want (a) extract the values for nested metrics (b) make a distribution for each metric
-    // (c) shove the distribution into the right field in our return type and (d) only return
-    // a result if the option is defined for any of the tasks.  MetricHelper is a little util
-    // to make it a little easier to deal w/ all of the nested options.  Mostly it lets us just
-    // implement one "build" method, which just builds the quantiles for each field.
-
-    val inputMetrics =
-      new MetricHelper[v1.InputMetrics, v1.InputMetricDistributions](rawMetrics, quantiles) {
-        def getSubmetrics(raw: v1.TaskMetrics): v1.InputMetrics = raw.inputMetrics
-
-        def build: v1.InputMetricDistributions = new v1.InputMetricDistributions(
-          bytesRead = submetricQuantiles(_.bytesRead),
-          recordsRead = submetricQuantiles(_.recordsRead)
-        )
-      }.build
-
-    val outputMetrics =
-      new MetricHelper[v1.OutputMetrics, v1.OutputMetricDistributions](rawMetrics, quantiles) {
-        def getSubmetrics(raw: v1.TaskMetrics): v1.OutputMetrics = raw.outputMetrics
-
-        def build: v1.OutputMetricDistributions = new v1.OutputMetricDistributions(
-          bytesWritten = submetricQuantiles(_.bytesWritten),
-          recordsWritten = submetricQuantiles(_.recordsWritten)
-        )
-      }.build
-
-    val shuffleReadMetrics =
-      new MetricHelper[v1.ShuffleReadMetrics, v1.ShuffleReadMetricDistributions](rawMetrics,
-        quantiles) {
-        def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleReadMetrics =
-          raw.shuffleReadMetrics
-
-        def build: v1.ShuffleReadMetricDistributions = new v1.ShuffleReadMetricDistributions(
-          readBytes = submetricQuantiles { s => s.localBytesRead + s.remoteBytesRead },
-          readRecords = submetricQuantiles(_.recordsRead),
-          remoteBytesRead = submetricQuantiles(_.remoteBytesRead),
-          remoteBytesReadToDisk = submetricQuantiles(_.remoteBytesReadToDisk),
-          remoteBlocksFetched = submetricQuantiles(_.remoteBlocksFetched),
-          localBlocksFetched = submetricQuantiles(_.localBlocksFetched),
-          totalBlocksFetched = submetricQuantiles { s =>
-            s.localBlocksFetched + s.remoteBlocksFetched
-          },
-          fetchWaitTime = submetricQuantiles(_.fetchWaitTime)
-        )
-      }.build
-
-    val shuffleWriteMetrics =
-      new MetricHelper[v1.ShuffleWriteMetrics, v1.ShuffleWriteMetricDistributions](rawMetrics,
-        quantiles) {
-        def getSubmetrics(raw: v1.TaskMetrics): v1.ShuffleWriteMetrics =
-          raw.shuffleWriteMetrics
-
-        def build: v1.ShuffleWriteMetricDistributions = new v1.ShuffleWriteMetricDistributions(
-          writeBytes = submetricQuantiles(_.bytesWritten),
-          writeRecords = submetricQuantiles(_.recordsWritten),
-          writeTime = submetricQuantiles(_.writeTime)
-        )
-      }.build
-
-    new v1.TaskMetricDistributions(
+      unsortedQuantiles: Array[Double]): Option[v1.TaskMetricDistributions] = {
+    val stageKey = Array(stageId, stageAttemptId)
+    val quantiles = unsortedQuantiles.sorted
+
+    // We don't know how many tasks remain in the store that actually have metrics. So scan one
+    // metric and count how many valid tasks there are. Use skip() instead of next() since it's
+    // cheaper for disk stores (avoids deserialization).
+    val count = {
+      Utils.tryWithResource(
+        store.view(classOf[TaskDataWrapper])
+          .parent(stageKey)
+          .index(TaskIndexNames.EXEC_RUN_TIME)
+          .first(0L)
+          .closeableIterator()
+      ) { it =>
+        var _count = 0L
+        while (it.hasNext()) {
+          _count += 1
+          it.skip(1)
+        }
+        _count
+      }
+    }
+
+    if (count <= 0) {
+      return None
+    }
+
+    // Find out which quantiles are already cached. The data in the store must match the expected
+    // task count to be considered, otherwise it will be re-scanned and overwritten.
+    val cachedQuantiles = quantiles.filter(shouldCacheQuantile).flatMap { q =>
+      val qkey = Array(stageId, stageAttemptId, quantileToString(q))
+      asOption(store.read(classOf[CachedQuantile], qkey)).filter(_.taskCount == count)
+    }
+
+    // If there are no missing quantiles, return the data. Otherwise, just compute everything
+    // to make the code simpler.
+    if (cachedQuantiles.size == quantiles.size) {
+      def toValues(fn: CachedQuantile => Double): IndexedSeq[Double] = cachedQuantiles.map(fn)
+
+      val distributions = new v1.TaskMetricDistributions(
+        quantiles = quantiles,
+        executorDeserializeTime = toValues(_.executorDeserializeTime),
+        executorDeserializeCpuTime = toValues(_.executorDeserializeCpuTime),
+        executorRunTime = toValues(_.executorRunTime),
+        executorCpuTime = toValues(_.executorCpuTime),
+        resultSize = toValues(_.resultSize),
+        jvmGcTime = toValues(_.jvmGcTime),
+        resultSerializationTime = toValues(_.resultSerializationTime),
+        gettingResultTime = toValues(_.gettingResultTime),
+        schedulerDelay = toValues(_.schedulerDelay),
+        peakExecutionMemory = toValues(_.peakExecutionMemory),
+        memoryBytesSpilled = toValues(_.memoryBytesSpilled),
+        diskBytesSpilled = toValues(_.diskBytesSpilled),
+        inputMetrics = new v1.InputMetricDistributions(
+          toValues(_.bytesRead),
+          toValues(_.recordsRead)),
+        outputMetrics = new v1.OutputMetricDistributions(
+          toValues(_.bytesWritten),
+          toValues(_.recordsWritten)),
+        shuffleReadMetrics = new v1.ShuffleReadMetricDistributions(
+          toValues(_.shuffleReadBytes),
+          toValues(_.shuffleRecordsRead),
+          toValues(_.shuffleRemoteBlocksFetched),
+          toValues(_.shuffleLocalBlocksFetched),
+          toValues(_.shuffleFetchWaitTime),
+          toValues(_.shuffleRemoteBytesRead),
+          toValues(_.shuffleRemoteBytesReadToDisk),
+          toValues(_.shuffleTotalBlocksFetched)),
+        shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions(
+          toValues(_.shuffleWriteBytes),
+          toValues(_.shuffleWriteRecords),
+          toValues(_.shuffleWriteTime)))
+
+      return Some(distributions)
+    }
+
+    // Compute quantiles by scanning the tasks in the store. This is not really stable for live
+    // stages (e.g. the number of recorded tasks may change while this code is running), but should
+    // stabilize once the stage finishes. It's also slow, especially with disk stores.
+    val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }
+
+    def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
+      Utils.tryWithResource(
+        store.view(classOf[TaskDataWrapper])
+          .parent(stageKey)
+          .index(index)
+          .first(0L)
+          .closeableIterator()
+      ) { it =>
+        var last = Double.NaN
+        var currentIdx = -1L
+        indices.map { idx =>
+          if (idx == currentIdx) {
+            last
+          } else {
+            val diff = idx - currentIdx
+            currentIdx = idx
+            if (it.skip(diff - 1)) {
+              last = fn(it.next()).toDouble
+              last
+            } else {
+              Double.NaN
+            }
+          }
+        }.toIndexedSeq
+      }
+    }
+
+    val computedQuantiles = new v1.TaskMetricDistributions(
       quantiles = quantiles,
-      executorDeserializeTime = metricQuantiles(_.executorDeserializeTime),
-      executorDeserializeCpuTime = metricQuantiles(_.executorDeserializeCpuTime),
-      executorRunTime = metricQuantiles(_.executorRunTime),
-      executorCpuTime = metricQuantiles(_.executorCpuTime),
-      resultSize = metricQuantiles(_.resultSize),
-      jvmGcTime = metricQuantiles(_.jvmGcTime),
-      resultSerializationTime = metricQuantiles(_.resultSerializationTime),
-      memoryBytesSpilled = metricQuantiles(_.memoryBytesSpilled),
-      diskBytesSpilled = metricQuantiles(_.diskBytesSpilled),
-      inputMetrics = inputMetrics,
-      outputMetrics = outputMetrics,
-      shuffleReadMetrics = shuffleReadMetrics,
-      shuffleWriteMetrics = shuffleWriteMetrics
-    )
+      executorDeserializeTime = scanTasks(TaskIndexNames.DESER_TIME) { t =>
+        t.executorDeserializeTime
+      },
+      executorDeserializeCpuTime = scanTasks(TaskIndexNames.DESER_CPU_TIME) { t =>
+        t.executorDeserializeCpuTime
+      },
+      executorRunTime = scanTasks(TaskIndexNames.EXEC_RUN_TIME) { t => t.executorRunTime },
+      executorCpuTime = scanTasks(TaskIndexNames.EXEC_CPU_TIME) { t => t.executorCpuTime },
+      resultSize = scanTasks(TaskIndexNames.RESULT_SIZE) { t => t.resultSize },
+      jvmGcTime = scanTasks(TaskIndexNames.GC_TIME) { t => t.jvmGcTime },
+      resultSerializationTime = scanTasks(TaskIndexNames.SER_TIME) { t =>
+        t.resultSerializationTime
+      },
+      gettingResultTime = scanTasks(TaskIndexNames.GETTING_RESULT_TIME) { t =>
+        t.gettingResultTime
+      },
+      schedulerDelay = scanTasks(TaskIndexNames.SCHEDULER_DELAY) { t => t.schedulerDelay },
+      peakExecutionMemory = scanTasks(TaskIndexNames.PEAK_MEM) { t => t.peakExecutionMemory },
+      memoryBytesSpilled = scanTasks(TaskIndexNames.MEM_SPILL) { t => t.memoryBytesSpilled },
+      diskBytesSpilled = scanTasks(TaskIndexNames.DISK_SPILL) { t => t.diskBytesSpilled },
+      inputMetrics = new v1.InputMetricDistributions(
+        scanTasks(TaskIndexNames.INPUT_SIZE) { t => t.inputBytesRead },
+        scanTasks(TaskIndexNames.INPUT_RECORDS) { t => t.inputRecordsRead }),
+      outputMetrics = new v1.OutputMetricDistributions(
+        scanTasks(TaskIndexNames.OUTPUT_SIZE) { t => t.outputBytesWritten },
+        scanTasks(TaskIndexNames.OUTPUT_RECORDS) { t => t.outputRecordsWritten }),
+      shuffleReadMetrics = new v1.ShuffleReadMetricDistributions(
+        scanTasks(TaskIndexNames.SHUFFLE_TOTAL_READS) { m =>
+          m.shuffleLocalBytesRead + m.shuffleRemoteBytesRead
+        },
+        scanTasks(TaskIndexNames.SHUFFLE_READ_RECORDS) { t => t.shuffleRecordsRead },
+        scanTasks(TaskIndexNames.SHUFFLE_REMOTE_BLOCKS) { t => t.shuffleRemoteBlocksFetched },
+        scanTasks(TaskIndexNames.SHUFFLE_LOCAL_BLOCKS) { t => t.shuffleLocalBlocksFetched },
+        scanTasks(TaskIndexNames.SHUFFLE_READ_TIME) { t => t.shuffleFetchWaitTime },
+        scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS) { t => t.shuffleRemoteBytesRead },
+        scanTasks(TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK) { t =>
+          t.shuffleRemoteBytesReadToDisk
+        },
+        scanTasks(TaskIndexNames.SHUFFLE_TOTAL_BLOCKS) { m =>
+          m.shuffleLocalBlocksFetched + m.shuffleRemoteBlocksFetched
+        }),
+      shuffleWriteMetrics = new v1.ShuffleWriteMetricDistributions(
+        scanTasks(TaskIndexNames.SHUFFLE_WRITE_SIZE) { t => t.shuffleBytesWritten },
+        scanTasks(TaskIndexNames.SHUFFLE_WRITE_RECORDS) { t => t.shuffleRecordsWritten },
+        scanTasks(TaskIndexNames.SHUFFLE_WRITE_TIME) { t => t.shuffleWriteTime }))
+
+    // Go through the computed quantiles and cache the values that match the caching criteria.
+    computedQuantiles.quantiles.zipWithIndex
+      .filter { case (q, _) => quantiles.contains(q) && shouldCacheQuantile(q) }
+      .foreach { case (q, idx) =>
+        val cached = new CachedQuantile(stageId, stageAttemptId, quantileToString(q), count,
+          executorDeserializeTime = computedQuantiles.executorDeserializeTime(idx),
+          executorDeserializeCpuTime = computedQuantiles.executorDeserializeCpuTime(idx),
+          executorRunTime = computedQuantiles.executorRunTime(idx),
+          executorCpuTime = computedQuantiles.executorCpuTime(idx),
+          resultSize = computedQuantiles.resultSize(idx),
+          jvmGcTime = computedQuantiles.jvmGcTime(idx),
+          resultSerializationTime = computedQuantiles.resultSerializationTime(idx),
+          gettingResultTime = computedQuantiles.gettingResultTime(idx),
+          schedulerDelay = computedQuantiles.schedulerDelay(idx),
+          peakExecutionMemory = computedQuantiles.peakExecutionMemory(idx),
+          memoryBytesSpilled = computedQuantiles.memoryBytesSpilled(idx),
+          diskBytesSpilled = computedQuantiles.diskBytesSpilled(idx),
+
+          bytesRead = computedQuantiles.inputMetrics.bytesRead(idx),
+          recordsRead = computedQuantiles.inputMetrics.recordsRead(idx),
+
+          bytesWritten = computedQuantiles.outputMetrics.bytesWritten(idx),
+          recordsWritten = computedQuantiles.outputMetrics.recordsWritten(idx),
+
+          shuffleReadBytes = computedQuantiles.shuffleReadMetrics.readBytes(idx),
+          shuffleRecordsRead = computedQuantiles.shuffleReadMetrics.readRecords(idx),
+          shuffleRemoteBlocksFetched =
+            computedQuantiles.shuffleReadMetrics.remoteBlocksFetched(idx),
+          shuffleLocalBlocksFetched = computedQuantiles.shuffleReadMetrics.localBlocksFetched(idx),
+          shuffleFetchWaitTime = computedQuantiles.shuffleReadMetrics.fetchWaitTime(idx),
+          shuffleRemoteBytesRead = computedQuantiles.shuffleReadMetrics.remoteBytesRead(idx),
+          shuffleRemoteBytesReadToDisk =
+            computedQuantiles.shuffleReadMetrics.remoteBytesReadToDisk(idx),
+          shuffleTotalBlocksFetched = computedQuantiles.shuffleReadMetrics.totalBlocksFetched(idx),
+
+          shuffleWriteBytes = computedQuantiles.shuffleWriteMetrics.writeBytes(idx),
+          shuffleWriteRecords = computedQuantiles.shuffleWriteMetrics.writeRecords(idx),
+          shuffleWriteTime = computedQuantiles.shuffleWriteMetrics.writeTime(idx))
+        store.write(cached)
+      }
+
+    Some(computedQuantiles)
   }
 
+  /**
+   * Whether to cache information about a specific metric quantile. We cache quantiles at every 0.05
+   * step, which covers the default values used both in the API and in the stages page.
+   */
+  private def shouldCacheQuantile(q: Double): Boolean = (math.round(q * 100) % 5) == 0
+
+  private def quantileToString(q: Double): String = math.round(q * 100).toString
+
   def taskList(stageId: Int, stageAttemptId: Int, maxTasks: Int): Seq[v1.TaskData] = {
     val stageKey = Array(stageId, stageAttemptId)
     store.view(classOf[TaskDataWrapper]).index("stage").first(stageKey).last(stageKey).reverse()
-      .max(maxTasks).asScala.map(_.info).toSeq.reverse
+      .max(maxTasks).asScala.map(_.toApi).toSeq.reverse
   }
 
   def taskList(
@@ -219,18 +354,43 @@ private[spark] class AppStatusStore(
       offset: Int,
       length: Int,
       sortBy: v1.TaskSorting): Seq[v1.TaskData] = {
+    val (indexName, ascending) = sortBy match {
+      case v1.TaskSorting.ID =>
+        (None, true)
+      case v1.TaskSorting.INCREASING_RUNTIME =>
+        (Some(TaskIndexNames.EXEC_RUN_TIME), true)
+      case v1.TaskSorting.DECREASING_RUNTIME =>
+        (Some(TaskIndexNames.EXEC_RUN_TIME), false)
+    }
+    taskList(stageId, stageAttemptId, offset, length, indexName, ascending)
+  }
+
+  def taskList(
+      stageId: Int,
+      stageAttemptId: Int,
+      offset: Int,
+      length: Int,
+      sortBy: Option[String],
+      ascending: Boolean): Seq[v1.TaskData] = {
     val stageKey = Array(stageId, stageAttemptId)
     val base = store.view(classOf[TaskDataWrapper])
     val indexed = sortBy match {
-      case v1.TaskSorting.ID =>
+      case Some(index) =>
+        base.index(index).parent(stageKey)
+
+      case _ =>
+        // Sort by ID, which is the "stage" index.
         base.index("stage").first(stageKey).last(stageKey)
-      case v1.TaskSorting.INCREASING_RUNTIME =>
-        base.index("runtime").first(stageKey ++ Array(-1L)).last(stageKey ++ Array(Long.MaxValue))
-      case v1.TaskSorting.DECREASING_RUNTIME =>
-        base.index("runtime").first(stageKey ++ Array(Long.MaxValue)).last(stageKey ++ Array(-1L))
-          .reverse()
     }
-    indexed.skip(offset).max(length).asScala.map(_.info).toSeq
+
+    val ordered = if (ascending) indexed else indexed.reverse()
+    ordered.skip(offset).max(length).asScala.map(_.toApi).toSeq
+  }
+
+  def executorSummary(stageId: Int, attemptId: Int): Map[String, v1.ExecutorStageSummary] = {
+    val stageKey = Array(stageId, attemptId)
+    store.view(classOf[ExecutorStageSummaryWrapper]).index("stage").first(stageKey).last(stageKey)
+      .asScala.map { exec => (exec.executorId -> exec.info) }.toMap
   }
 
   def rddList(cachedOnly: Boolean = true): Seq[v1.RDDStorageInfo] = {
@@ -256,12 +416,6 @@ private[spark] class AppStatusStore(
       .map { t => (t.taskId, t) }
       .toMap
 
-    val stageKey = Array(stage.stageId, stage.attemptId)
-    val execs = store.view(classOf[ExecutorStageSummaryWrapper]).index("stage").first(stageKey)
-      .last(stageKey).closeableIterator().asScala
-      .map { exec => (exec.executorId -> exec.info) }
-      .toMap
-
     new v1.StageData(
       stage.status,
       stage.stageId,
@@ -295,7 +449,7 @@ private[spark] class AppStatusStore(
       stage.rddIds,
       stage.accumulatorUpdates,
       Some(tasks),
-      Some(execs),
+      Some(executorSummary(stage.stageId, stage.attemptId)),
       stage.killedTasksSummary)
   }
 
@@ -352,22 +506,3 @@ private[spark] object AppStatusStore {
   }
 
 }
-
-/**
- * Helper for getting distributions from nested metric types.
- */
-private abstract class MetricHelper[I, O](
-    rawMetrics: Seq[v1.TaskMetrics],
-    quantiles: Array[Double]) {
-
-  def getSubmetrics(raw: v1.TaskMetrics): I
-
-  def build: O
-
-  val data: Seq[I] = rawMetrics.map(getSubmetrics)
-
-  /** applies the given function to all input metrics, and returns the quantiles */
-  def submetricQuantiles(f: I => Double): IndexedSeq[Double] = {
-    Distribution(data.map { d => f(d) }).get.getQuantiles(quantiles)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala b/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala
new file mode 100644
index 0000000..341bd4e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import org.apache.spark.status.api.v1.{TaskData, TaskMetrics}
+
+private[spark] object AppStatusUtils {
+
+  def schedulerDelay(task: TaskData): Long = {
+    if (task.taskMetrics.isDefined && task.duration.isDefined) {
+      val m = task.taskMetrics.get
+      schedulerDelay(task.launchTime.getTime(), fetchStart(task), task.duration.get,
+        m.executorDeserializeTime, m.resultSerializationTime, m.executorRunTime)
+    } else {
+      0L
+    }
+  }
+
+  def gettingResultTime(task: TaskData): Long = {
+    gettingResultTime(task.launchTime.getTime(), fetchStart(task), task.duration.getOrElse(-1L))
+  }
+
+  def schedulerDelay(
+      launchTime: Long,
+      fetchStart: Long,
+      duration: Long,
+      deserializeTime: Long,
+      serializeTime: Long,
+      runTime: Long): Long = {
+    math.max(0, duration - runTime - deserializeTime - serializeTime -
+      gettingResultTime(launchTime, fetchStart, duration))
+  }
+
+  def gettingResultTime(launchTime: Long, fetchStart: Long, duration: Long): Long = {
+    if (fetchStart > 0) {
+      if (duration > 0) {
+        launchTime + duration - fetchStart
+      } else {
+        System.currentTimeMillis() - fetchStart
+      }
+    } else {
+      0L
+    }
+  }
+
+  private def fetchStart(task: TaskData): Long = {
+    if (task.resultFetchStart.isDefined) {
+      task.resultFetchStart.get.getTime()
+    } else {
+      -1
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 305c2fa..4295e66 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -22,6 +22,8 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.mutable.HashMap
 
+import com.google.common.collect.Interners
+
 import org.apache.spark.JobExecutionStatus
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
@@ -119,7 +121,9 @@ private class LiveTask(
 
   import LiveEntityHelpers._
 
-  private var recordedMetrics: v1.TaskMetrics = null
+  // The task metrics use a special value when no metrics have been reported. The special value is
+  // checked when calculating indexed values when writing to the store (see [[TaskDataWrapper]]).
+  private var metrics: v1.TaskMetrics = createMetrics(default = -1L)
 
   var errorMessage: Option[String] = None
 
@@ -129,8 +133,8 @@ private class LiveTask(
    */
   def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = {
     if (metrics != null) {
-      val old = recordedMetrics
-      recordedMetrics = new v1.TaskMetrics(
+      val old = this.metrics
+      val newMetrics = createMetrics(
         metrics.executorDeserializeTime,
         metrics.executorDeserializeCpuTime,
         metrics.executorRunTime,
@@ -141,73 +145,35 @@ private class LiveTask(
         metrics.memoryBytesSpilled,
         metrics.diskBytesSpilled,
         metrics.peakExecutionMemory,
-        new v1.InputMetrics(
-          metrics.inputMetrics.bytesRead,
-          metrics.inputMetrics.recordsRead),
-        new v1.OutputMetrics(
-          metrics.outputMetrics.bytesWritten,
-          metrics.outputMetrics.recordsWritten),
-        new v1.ShuffleReadMetrics(
-          metrics.shuffleReadMetrics.remoteBlocksFetched,
-          metrics.shuffleReadMetrics.localBlocksFetched,
-          metrics.shuffleReadMetrics.fetchWaitTime,
-          metrics.shuffleReadMetrics.remoteBytesRead,
-          metrics.shuffleReadMetrics.remoteBytesReadToDisk,
-          metrics.shuffleReadMetrics.localBytesRead,
-          metrics.shuffleReadMetrics.recordsRead),
-        new v1.ShuffleWriteMetrics(
-          metrics.shuffleWriteMetrics.bytesWritten,
-          metrics.shuffleWriteMetrics.writeTime,
-          metrics.shuffleWriteMetrics.recordsWritten))
-      if (old != null) calculateMetricsDelta(recordedMetrics, old) else recordedMetrics
+        metrics.inputMetrics.bytesRead,
+        metrics.inputMetrics.recordsRead,
+        metrics.outputMetrics.bytesWritten,
+        metrics.outputMetrics.recordsWritten,
+        metrics.shuffleReadMetrics.remoteBlocksFetched,
+        metrics.shuffleReadMetrics.localBlocksFetched,
+        metrics.shuffleReadMetrics.fetchWaitTime,
+        metrics.shuffleReadMetrics.remoteBytesRead,
+        metrics.shuffleReadMetrics.remoteBytesReadToDisk,
+        metrics.shuffleReadMetrics.localBytesRead,
+        metrics.shuffleReadMetrics.recordsRead,
+        metrics.shuffleWriteMetrics.bytesWritten,
+        metrics.shuffleWriteMetrics.writeTime,
+        metrics.shuffleWriteMetrics.recordsWritten)
+
+      this.metrics = newMetrics
+
+      // Only calculate the delta if the old metrics contain valid information, otherwise
+      // the new metrics are the delta.
+      if (old.executorDeserializeTime >= 0L) {
+        subtractMetrics(newMetrics, old)
+      } else {
+        newMetrics
+      }
     } else {
       null
     }
   }
 
-  /**
-   * Return a new TaskMetrics object containing the delta of the various fields of the given
-   * metrics objects. This is currently targeted at updating stage data, so it does not
-   * necessarily calculate deltas for all the fields.
-   */
-  private def calculateMetricsDelta(
-      metrics: v1.TaskMetrics,
-      old: v1.TaskMetrics): v1.TaskMetrics = {
-    val shuffleWriteDelta = new v1.ShuffleWriteMetrics(
-      metrics.shuffleWriteMetrics.bytesWritten - old.shuffleWriteMetrics.bytesWritten,
-      0L,
-      metrics.shuffleWriteMetrics.recordsWritten - old.shuffleWriteMetrics.recordsWritten)
-
-    val shuffleReadDelta = new v1.ShuffleReadMetrics(
-      0L, 0L, 0L,
-      metrics.shuffleReadMetrics.remoteBytesRead - old.shuffleReadMetrics.remoteBytesRead,
-      metrics.shuffleReadMetrics.remoteBytesReadToDisk -
-        old.shuffleReadMetrics.remoteBytesReadToDisk,
-      metrics.shuffleReadMetrics.localBytesRead - old.shuffleReadMetrics.localBytesRead,
-      metrics.shuffleReadMetrics.recordsRead - old.shuffleReadMetrics.recordsRead)
-
-    val inputDelta = new v1.InputMetrics(
-      metrics.inputMetrics.bytesRead - old.inputMetrics.bytesRead,
-      metrics.inputMetrics.recordsRead - old.inputMetrics.recordsRead)
-
-    val outputDelta = new v1.OutputMetrics(
-      metrics.outputMetrics.bytesWritten - old.outputMetrics.bytesWritten,
-      metrics.outputMetrics.recordsWritten - old.outputMetrics.recordsWritten)
-
-    new v1.TaskMetrics(
-      0L, 0L,
-      metrics.executorRunTime - old.executorRunTime,
-      metrics.executorCpuTime - old.executorCpuTime,
-      0L, 0L, 0L,
-      metrics.memoryBytesSpilled - old.memoryBytesSpilled,
-      metrics.diskBytesSpilled - old.diskBytesSpilled,
-      0L,
-      inputDelta,
-      outputDelta,
-      shuffleReadDelta,
-      shuffleWriteDelta)
-  }
-
   override protected def doUpdate(): Any = {
     val duration = if (info.finished) {
       info.duration
@@ -215,22 +181,48 @@ private class LiveTask(
       info.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis()))
     }
 
-    val task = new v1.TaskData(
+    new TaskDataWrapper(
       info.taskId,
       info.index,
       info.attemptNumber,
-      new Date(info.launchTime),
-      if (info.gettingResult) Some(new Date(info.gettingResultTime)) else None,
-      Some(duration),
-      info.executorId,
-      info.host,
-      info.status,
-      info.taskLocality.toString(),
+      info.launchTime,
+      if (info.gettingResult) info.gettingResultTime else -1L,
+      duration,
+      weakIntern(info.executorId),
+      weakIntern(info.host),
+      weakIntern(info.status),
+      weakIntern(info.taskLocality.toString()),
       info.speculative,
       newAccumulatorInfos(info.accumulables),
       errorMessage,
-      Option(recordedMetrics))
-    new TaskDataWrapper(task, stageId, stageAttemptId)
+
+      metrics.executorDeserializeTime,
+      metrics.executorDeserializeCpuTime,
+      metrics.executorRunTime,
+      metrics.executorCpuTime,
+      metrics.resultSize,
+      metrics.jvmGcTime,
+      metrics.resultSerializationTime,
+      metrics.memoryBytesSpilled,
+      metrics.diskBytesSpilled,
+      metrics.peakExecutionMemory,
+      metrics.inputMetrics.bytesRead,
+      metrics.inputMetrics.recordsRead,
+      metrics.outputMetrics.bytesWritten,
+      metrics.outputMetrics.recordsWritten,
+      metrics.shuffleReadMetrics.remoteBlocksFetched,
+      metrics.shuffleReadMetrics.localBlocksFetched,
+      metrics.shuffleReadMetrics.fetchWaitTime,
+      metrics.shuffleReadMetrics.remoteBytesRead,
+      metrics.shuffleReadMetrics.remoteBytesReadToDisk,
+      metrics.shuffleReadMetrics.localBytesRead,
+      metrics.shuffleReadMetrics.recordsRead,
+      metrics.shuffleWriteMetrics.bytesWritten,
+      metrics.shuffleWriteMetrics.writeTime,
+      metrics.shuffleWriteMetrics.recordsWritten,
+
+      stageId,
+      stageAttemptId)
   }
 
 }
@@ -313,50 +305,19 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
 
 }
 
-/** Metrics tracked per stage (both total and per executor). */
-private class MetricsTracker {
-  var executorRunTime = 0L
-  var executorCpuTime = 0L
-  var inputBytes = 0L
-  var inputRecords = 0L
-  var outputBytes = 0L
-  var outputRecords = 0L
-  var shuffleReadBytes = 0L
-  var shuffleReadRecords = 0L
-  var shuffleWriteBytes = 0L
-  var shuffleWriteRecords = 0L
-  var memoryBytesSpilled = 0L
-  var diskBytesSpilled = 0L
-
-  def update(delta: v1.TaskMetrics): Unit = {
-    executorRunTime += delta.executorRunTime
-    executorCpuTime += delta.executorCpuTime
-    inputBytes += delta.inputMetrics.bytesRead
-    inputRecords += delta.inputMetrics.recordsRead
-    outputBytes += delta.outputMetrics.bytesWritten
-    outputRecords += delta.outputMetrics.recordsWritten
-    shuffleReadBytes += delta.shuffleReadMetrics.localBytesRead +
-      delta.shuffleReadMetrics.remoteBytesRead
-    shuffleReadRecords += delta.shuffleReadMetrics.recordsRead
-    shuffleWriteBytes += delta.shuffleWriteMetrics.bytesWritten
-    shuffleWriteRecords += delta.shuffleWriteMetrics.recordsWritten
-    memoryBytesSpilled += delta.memoryBytesSpilled
-    diskBytesSpilled += delta.diskBytesSpilled
-  }
-
-}
-
 private class LiveExecutorStageSummary(
     stageId: Int,
     attemptId: Int,
     executorId: String) extends LiveEntity {
 
+  import LiveEntityHelpers._
+
   var taskTime = 0L
   var succeededTasks = 0
   var failedTasks = 0
   var killedTasks = 0
 
-  val metrics = new MetricsTracker()
+  var metrics = createMetrics(default = 0L)
 
   override protected def doUpdate(): Any = {
     val info = new v1.ExecutorStageSummary(
@@ -364,14 +325,14 @@ private class LiveExecutorStageSummary(
       failedTasks,
       succeededTasks,
       killedTasks,
-      metrics.inputBytes,
-      metrics.inputRecords,
-      metrics.outputBytes,
-      metrics.outputRecords,
-      metrics.shuffleReadBytes,
-      metrics.shuffleReadRecords,
-      metrics.shuffleWriteBytes,
-      metrics.shuffleWriteRecords,
+      metrics.inputMetrics.bytesRead,
+      metrics.inputMetrics.recordsRead,
+      metrics.outputMetrics.bytesWritten,
+      metrics.outputMetrics.recordsWritten,
+      metrics.shuffleReadMetrics.remoteBytesRead + metrics.shuffleReadMetrics.localBytesRead,
+      metrics.shuffleReadMetrics.recordsRead,
+      metrics.shuffleWriteMetrics.bytesWritten,
+      metrics.shuffleWriteMetrics.recordsWritten,
       metrics.memoryBytesSpilled,
       metrics.diskBytesSpilled)
     new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info)
@@ -402,7 +363,9 @@ private class LiveStage extends LiveEntity {
 
   var firstLaunchTime = Long.MaxValue
 
-  val metrics = new MetricsTracker()
+  var localitySummary: Map[String, Long] = Map()
+
+  var metrics = createMetrics(default = 0L)
 
   val executorSummaries = new HashMap[String, LiveExecutorStageSummary]()
 
@@ -435,14 +398,14 @@ private class LiveStage extends LiveEntity {
       info.completionTime.map(new Date(_)),
       info.failureReason,
 
-      metrics.inputBytes,
-      metrics.inputRecords,
-      metrics.outputBytes,
-      metrics.outputRecords,
-      metrics.shuffleReadBytes,
-      metrics.shuffleReadRecords,
-      metrics.shuffleWriteBytes,
-      metrics.shuffleWriteRecords,
+      metrics.inputMetrics.bytesRead,
+      metrics.inputMetrics.recordsRead,
+      metrics.outputMetrics.bytesWritten,
+      metrics.outputMetrics.recordsWritten,
+      metrics.shuffleReadMetrics.localBytesRead + metrics.shuffleReadMetrics.remoteBytesRead,
+      metrics.shuffleReadMetrics.recordsRead,
+      metrics.shuffleWriteMetrics.bytesWritten,
+      metrics.shuffleWriteMetrics.recordsWritten,
       metrics.memoryBytesSpilled,
       metrics.diskBytesSpilled,
 
@@ -459,13 +422,15 @@ private class LiveStage extends LiveEntity {
   }
 
   override protected def doUpdate(): Any = {
-    new StageDataWrapper(toApi(), jobIds)
+    new StageDataWrapper(toApi(), jobIds, localitySummary)
   }
 
 }
 
 private class LiveRDDPartition(val blockName: String) {
 
+  import LiveEntityHelpers._
+
   // Pointers used by RDDPartitionSeq.
   @volatile var prev: LiveRDDPartition = null
   @volatile var next: LiveRDDPartition = null
@@ -485,7 +450,7 @@ private class LiveRDDPartition(val blockName: String) {
       diskUsed: Long): Unit = {
     value = new v1.RDDPartitionInfo(
       blockName,
-      storageLevel,
+      weakIntern(storageLevel),
       memoryUsed,
       diskUsed,
       executors)
@@ -495,6 +460,8 @@ private class LiveRDDPartition(val blockName: String) {
 
 private class LiveRDDDistribution(exec: LiveExecutor) {
 
+  import LiveEntityHelpers._
+
   val executorId = exec.executorId
   var memoryUsed = 0L
   var diskUsed = 0L
@@ -508,7 +475,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) {
   def toApi(): v1.RDDDataDistribution = {
     if (lastUpdate == null) {
       lastUpdate = new v1.RDDDataDistribution(
-        exec.hostPort,
+        weakIntern(exec.hostPort),
         memoryUsed,
         exec.maxMemory - exec.memoryUsed,
         diskUsed,
@@ -524,7 +491,9 @@ private class LiveRDDDistribution(exec: LiveExecutor) {
 
 private class LiveRDD(val info: RDDInfo) extends LiveEntity {
 
-  var storageLevel: String = info.storageLevel.description
+  import LiveEntityHelpers._
+
+  var storageLevel: String = weakIntern(info.storageLevel.description)
   var memoryUsed = 0L
   var diskUsed = 0L
 
@@ -533,6 +502,10 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity {
 
   private val distributions = new HashMap[String, LiveRDDDistribution]()
 
+  def setStorageLevel(level: String): Unit = {
+    this.storageLevel = weakIntern(level)
+  }
+
   def partition(blockName: String): LiveRDDPartition = {
     partitions.getOrElseUpdate(blockName, {
       val part = new LiveRDDPartition(blockName)
@@ -593,6 +566,9 @@ private class SchedulerPool(name: String) extends LiveEntity {
 
 private object LiveEntityHelpers {
 
+  private val stringInterner = Interners.newWeakInterner[String]()
+
+
   def newAccumulatorInfos(accums: Iterable[AccumulableInfo]): Seq[v1.AccumulableInfo] = {
     accums
       .filter { acc =>
@@ -604,13 +580,119 @@ private object LiveEntityHelpers {
       .map { acc =>
         new v1.AccumulableInfo(
           acc.id,
-          acc.name.orNull,
+          acc.name.map(weakIntern).orNull,
           acc.update.map(_.toString()),
           acc.value.map(_.toString()).orNull)
       }
       .toSeq
   }
 
+  /** String interning to reduce the memory usage. */
+  def weakIntern(s: String): String = {
+    stringInterner.intern(s)
+  }
+
+  // scalastyle:off argcount
+  def createMetrics(
+      executorDeserializeTime: Long,
+      executorDeserializeCpuTime: Long,
+      executorRunTime: Long,
+      executorCpuTime: Long,
+      resultSize: Long,
+      jvmGcTime: Long,
+      resultSerializationTime: Long,
+      memoryBytesSpilled: Long,
+      diskBytesSpilled: Long,
+      peakExecutionMemory: Long,
+      inputBytesRead: Long,
+      inputRecordsRead: Long,
+      outputBytesWritten: Long,
+      outputRecordsWritten: Long,
+      shuffleRemoteBlocksFetched: Long,
+      shuffleLocalBlocksFetched: Long,
+      shuffleFetchWaitTime: Long,
+      shuffleRemoteBytesRead: Long,
+      shuffleRemoteBytesReadToDisk: Long,
+      shuffleLocalBytesRead: Long,
+      shuffleRecordsRead: Long,
+      shuffleBytesWritten: Long,
+      shuffleWriteTime: Long,
+      shuffleRecordsWritten: Long): v1.TaskMetrics = {
+    new v1.TaskMetrics(
+      executorDeserializeTime,
+      executorDeserializeCpuTime,
+      executorRunTime,
+      executorCpuTime,
+      resultSize,
+      jvmGcTime,
+      resultSerializationTime,
+      memoryBytesSpilled,
+      diskBytesSpilled,
+      peakExecutionMemory,
+      new v1.InputMetrics(
+        inputBytesRead,
+        inputRecordsRead),
+      new v1.OutputMetrics(
+        outputBytesWritten,
+        outputRecordsWritten),
+      new v1.ShuffleReadMetrics(
+        shuffleRemoteBlocksFetched,
+        shuffleLocalBlocksFetched,
+        shuffleFetchWaitTime,
+        shuffleRemoteBytesRead,
+        shuffleRemoteBytesReadToDisk,
+        shuffleLocalBytesRead,
+        shuffleRecordsRead),
+      new v1.ShuffleWriteMetrics(
+        shuffleBytesWritten,
+        shuffleWriteTime,
+        shuffleRecordsWritten))
+  }
+  // scalastyle:on argcount
+
+  def createMetrics(default: Long): v1.TaskMetrics = {
+    createMetrics(default, default, default, default, default, default, default, default,
+      default, default, default, default, default, default, default, default,
+      default, default, default, default, default, default, default, default)
+  }
+
+  /** Add m2 values to m1. */
+  def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics): v1.TaskMetrics = addMetrics(m1, m2, 1)
+
+  /** Subtract m2 values from m1. */
+  def subtractMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics): v1.TaskMetrics = {
+    addMetrics(m1, m2, -1)
+  }
+
+  private def addMetrics(m1: v1.TaskMetrics, m2: v1.TaskMetrics, mult: Int): v1.TaskMetrics = {
+    createMetrics(
+      m1.executorDeserializeTime + m2.executorDeserializeTime * mult,
+      m1.executorDeserializeCpuTime + m2.executorDeserializeCpuTime * mult,
+      m1.executorRunTime + m2.executorRunTime * mult,
+      m1.executorCpuTime + m2.executorCpuTime * mult,
+      m1.resultSize + m2.resultSize * mult,
+      m1.jvmGcTime + m2.jvmGcTime * mult,
+      m1.resultSerializationTime + m2.resultSerializationTime * mult,
+      m1.memoryBytesSpilled + m2.memoryBytesSpilled * mult,
+      m1.diskBytesSpilled + m2.diskBytesSpilled * mult,
+      m1.peakExecutionMemory + m2.peakExecutionMemory * mult,
+      m1.inputMetrics.bytesRead + m2.inputMetrics.bytesRead * mult,
+      m1.inputMetrics.recordsRead + m2.inputMetrics.recordsRead * mult,
+      m1.outputMetrics.bytesWritten + m2.outputMetrics.bytesWritten * mult,
+      m1.outputMetrics.recordsWritten + m2.outputMetrics.recordsWritten * mult,
+      m1.shuffleReadMetrics.remoteBlocksFetched + m2.shuffleReadMetrics.remoteBlocksFetched * mult,
+      m1.shuffleReadMetrics.localBlocksFetched + m2.shuffleReadMetrics.localBlocksFetched * mult,
+      m1.shuffleReadMetrics.fetchWaitTime + m2.shuffleReadMetrics.fetchWaitTime * mult,
+      m1.shuffleReadMetrics.remoteBytesRead + m2.shuffleReadMetrics.remoteBytesRead * mult,
+      m1.shuffleReadMetrics.remoteBytesReadToDisk +
+        m2.shuffleReadMetrics.remoteBytesReadToDisk * mult,
+      m1.shuffleReadMetrics.localBytesRead + m2.shuffleReadMetrics.localBytesRead * mult,
+      m1.shuffleReadMetrics.recordsRead + m2.shuffleReadMetrics.recordsRead * mult,
+      m1.shuffleWriteMetrics.bytesWritten + m2.shuffleWriteMetrics.bytesWritten * mult,
+      m1.shuffleWriteMetrics.writeTime + m2.shuffleWriteMetrics.writeTime * mult,
+      m1.shuffleWriteMetrics.recordsWritten + m2.shuffleWriteMetrics.recordsWritten * mult)
+  }
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
index 3b87954..96249e4 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/StagesResource.scala
@@ -87,7 +87,8 @@ private[v1] class StagesResource extends BaseAppResource {
       }
     }
 
-    ui.store.taskSummary(stageId, stageAttemptId, quantiles)
+    ui.store.taskSummary(stageId, stageAttemptId, quantiles).getOrElse(
+      throw new NotFoundException(s"No tasks reported metrics for $stageId / $stageAttemptId yet."))
   }
 
   @GET

http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 45eaf93..7d8e4de 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -261,6 +261,9 @@ class TaskMetricDistributions private[spark](
     val resultSize: IndexedSeq[Double],
     val jvmGcTime: IndexedSeq[Double],
     val resultSerializationTime: IndexedSeq[Double],
+    val gettingResultTime: IndexedSeq[Double],
+    val schedulerDelay: IndexedSeq[Double],
+    val peakExecutionMemory: IndexedSeq[Double],
     val memoryBytesSpilled: IndexedSeq[Double],
     val diskBytesSpilled: IndexedSeq[Double],
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index 1cfd30d..c9cb996 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.status
 
-import java.lang.{Integer => JInteger, Long => JLong}
+import java.lang.{Long => JLong}
+import java.util.Date
 
 import com.fasterxml.jackson.annotation.JsonIgnore
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
 
 import org.apache.spark.status.KVUtils._
 import org.apache.spark.status.api.v1._
@@ -49,10 +51,10 @@ private[spark] class ApplicationEnvironmentInfoWrapper(val info: ApplicationEnvi
 private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) {
 
   @JsonIgnore @KVIndex
-  private[this] val id: String = info.id
+  private def id: String = info.id
 
   @JsonIgnore @KVIndex("active")
-  private[this] val active: Boolean = info.isActive
+  private def active: Boolean = info.isActive
 
   @JsonIgnore @KVIndex("host")
   val host: String = info.hostPort.split(":")(0)
@@ -69,51 +71,271 @@ private[spark] class JobDataWrapper(
     val skippedStages: Set[Int]) {
 
   @JsonIgnore @KVIndex
-  private[this] val id: Int = info.jobId
+  private def id: Int = info.jobId
 
 }
 
 private[spark] class StageDataWrapper(
     val info: StageData,
-    val jobIds: Set[Int]) {
+    val jobIds: Set[Int],
+    @JsonDeserialize(contentAs = classOf[JLong])
+    val locality: Map[String, Long]) {
 
   @JsonIgnore @KVIndex
-  def id: Array[Int] = Array(info.stageId, info.attemptId)
+  private[this] val id: Array[Int] = Array(info.stageId, info.attemptId)
 
   @JsonIgnore @KVIndex("stageId")
-  def stageId: Int = info.stageId
+  private def stageId: Int = info.stageId
 
+  @JsonIgnore @KVIndex("active")
+  private def active: Boolean = info.status == StageStatus.ACTIVE
+
+}
+
+/**
+ * Tasks have a lot of indices that are used in a few different places. This object keeps logical
+ * names for these indices, mapped to short strings to save space when using a disk store.
+ */
+private[spark] object TaskIndexNames {
+  final val ACCUMULATORS = "acc"
+  final val ATTEMPT = "att"
+  final val DESER_CPU_TIME = "dct"
+  final val DESER_TIME = "des"
+  final val DISK_SPILL = "dbs"
+  final val DURATION = "dur"
+  final val ERROR = "err"
+  final val EXECUTOR = "exe"
+  final val EXEC_CPU_TIME = "ect"
+  final val EXEC_RUN_TIME = "ert"
+  final val GC_TIME = "gc"
+  final val GETTING_RESULT_TIME = "grt"
+  final val INPUT_RECORDS = "ir"
+  final val INPUT_SIZE = "is"
+  final val LAUNCH_TIME = "lt"
+  final val LOCALITY = "loc"
+  final val MEM_SPILL = "mbs"
+  final val OUTPUT_RECORDS = "or"
+  final val OUTPUT_SIZE = "os"
+  final val PEAK_MEM = "pem"
+  final val RESULT_SIZE = "rs"
+  final val SCHEDULER_DELAY = "dly"
+  final val SER_TIME = "rst"
+  final val SHUFFLE_LOCAL_BLOCKS = "slbl"
+  final val SHUFFLE_READ_RECORDS = "srr"
+  final val SHUFFLE_READ_TIME = "srt"
+  final val SHUFFLE_REMOTE_BLOCKS = "srbl"
+  final val SHUFFLE_REMOTE_READS = "srby"
+  final val SHUFFLE_REMOTE_READS_TO_DISK = "srbd"
+  final val SHUFFLE_TOTAL_READS = "stby"
+  final val SHUFFLE_TOTAL_BLOCKS = "stbl"
+  final val SHUFFLE_WRITE_RECORDS = "swr"
+  final val SHUFFLE_WRITE_SIZE = "sws"
+  final val SHUFFLE_WRITE_TIME = "swt"
+  final val STAGE = "stage"
+  final val STATUS = "sta"
+  final val TASK_INDEX = "idx"
 }
 
 /**
- * The task information is always indexed with the stage ID, since that is how the UI and API
- * consume it. That means every indexed value has the stage ID and attempt ID included, aside
- * from the actual data being indexed.
+ * Unlike other data types, the task data wrapper does not keep a reference to the API's TaskData.
+ * That is to save memory, since for large applications there can be a large number of these
+ * elements (by default up to 100,000 per stage), and every bit of wasted memory adds up.
+ *
+ * It also contains many secondary indices, which are used to sort data efficiently in the UI at the
+ * expense of storage space (and slower write times).
  */
 private[spark] class TaskDataWrapper(
-    val info: TaskData,
+    // Storing this as an object actually saves memory; it's also used as the key in the in-memory
+    // store, so in that case you'd save the extra copy of the value here.
+    @KVIndexParam
+    val taskId: JLong,
+    @KVIndexParam(value = TaskIndexNames.TASK_INDEX, parent = TaskIndexNames.STAGE)
+    val index: Int,
+    @KVIndexParam(value = TaskIndexNames.ATTEMPT, parent = TaskIndexNames.STAGE)
+    val attempt: Int,
+    @KVIndexParam(value = TaskIndexNames.LAUNCH_TIME, parent = TaskIndexNames.STAGE)
+    val launchTime: Long,
+    val resultFetchStart: Long,
+    @KVIndexParam(value = TaskIndexNames.DURATION, parent = TaskIndexNames.STAGE)
+    val duration: Long,
+    @KVIndexParam(value = TaskIndexNames.EXECUTOR, parent = TaskIndexNames.STAGE)
+    val executorId: String,
+    val host: String,
+    @KVIndexParam(value = TaskIndexNames.STATUS, parent = TaskIndexNames.STAGE)
+    val status: String,
+    @KVIndexParam(value = TaskIndexNames.LOCALITY, parent = TaskIndexNames.STAGE)
+    val taskLocality: String,
+    val speculative: Boolean,
+    val accumulatorUpdates: Seq[AccumulableInfo],
+    val errorMessage: Option[String],
+
+    // The following is an exploded view of a TaskMetrics API object. This saves 5 objects
+    // (= 80 bytes of Java object overhead) per instance of this wrapper. If the first value
+    // (executorDeserializeTime) is -1L, it means the metrics for this task have not been
+    // recorded.
+    @KVIndexParam(value = TaskIndexNames.DESER_TIME, parent = TaskIndexNames.STAGE)
+    val executorDeserializeTime: Long,
+    @KVIndexParam(value = TaskIndexNames.DESER_CPU_TIME, parent = TaskIndexNames.STAGE)
+    val executorDeserializeCpuTime: Long,
+    @KVIndexParam(value = TaskIndexNames.EXEC_RUN_TIME, parent = TaskIndexNames.STAGE)
+    val executorRunTime: Long,
+    @KVIndexParam(value = TaskIndexNames.EXEC_CPU_TIME, parent = TaskIndexNames.STAGE)
+    val executorCpuTime: Long,
+    @KVIndexParam(value = TaskIndexNames.RESULT_SIZE, parent = TaskIndexNames.STAGE)
+    val resultSize: Long,
+    @KVIndexParam(value = TaskIndexNames.GC_TIME, parent = TaskIndexNames.STAGE)
+    val jvmGcTime: Long,
+    @KVIndexParam(value = TaskIndexNames.SER_TIME, parent = TaskIndexNames.STAGE)
+    val resultSerializationTime: Long,
+    @KVIndexParam(value = TaskIndexNames.MEM_SPILL, parent = TaskIndexNames.STAGE)
+    val memoryBytesSpilled: Long,
+    @KVIndexParam(value = TaskIndexNames.DISK_SPILL, parent = TaskIndexNames.STAGE)
+    val diskBytesSpilled: Long,
+    @KVIndexParam(value = TaskIndexNames.PEAK_MEM, parent = TaskIndexNames.STAGE)
+    val peakExecutionMemory: Long,
+    @KVIndexParam(value = TaskIndexNames.INPUT_SIZE, parent = TaskIndexNames.STAGE)
+    val inputBytesRead: Long,
+    @KVIndexParam(value = TaskIndexNames.INPUT_RECORDS, parent = TaskIndexNames.STAGE)
+    val inputRecordsRead: Long,
+    @KVIndexParam(value = TaskIndexNames.OUTPUT_SIZE, parent = TaskIndexNames.STAGE)
+    val outputBytesWritten: Long,
+    @KVIndexParam(value = TaskIndexNames.OUTPUT_RECORDS, parent = TaskIndexNames.STAGE)
+    val outputRecordsWritten: Long,
+    @KVIndexParam(value = TaskIndexNames.SHUFFLE_REMOTE_BLOCKS, parent = TaskIndexNames.STAGE)
+    val shuffleRemoteBlocksFetched: Long,
+    @KVIndexParam(value = TaskIndexNames.SHUFFLE_LOCAL_BLOCKS, parent = TaskIndexNames.STAGE)
+    val shuffleLocalBlocksFetched: Long,
+    @KVIndexParam(value = TaskIndexNames.SHUFFLE_READ_TIME, parent = TaskIndexNames.STAGE)
+    val shuffleFetchWaitTime: Long,
+    @KVIndexParam(value = TaskIndexNames.SHUFFLE_REMOTE_READS, parent = TaskIndexNames.STAGE)
+    val shuffleRemoteBytesRead: Long,
+    @KVIndexParam(value = TaskIndexNames.SHUFFLE_REMOTE_READS_TO_DISK,
+      parent = TaskIndexNames.STAGE)
+    val shuffleRemoteBytesReadToDisk: Long,
+    val shuffleLocalBytesRead: Long,
+    @KVIndexParam(value = TaskIndexNames.SHUFFLE_READ_RECORDS, parent = TaskIndexNames.STAGE)
+    val shuffleRecordsRead: Long,
+    @KVIndexParam(value = TaskIndexNames.SHUFFLE_WRITE_SIZE, parent = TaskIndexNames.STAGE)
+    val shuffleBytesWritten: Long,
+    @KVIndexParam(value = TaskIndexNames.SHUFFLE_WRITE_TIME, parent = TaskIndexNames.STAGE)
+    val shuffleWriteTime: Long,
+    @KVIndexParam(value = TaskIndexNames.SHUFFLE_WRITE_RECORDS, parent = TaskIndexNames.STAGE)
+    val shuffleRecordsWritten: Long,
+
     val stageId: Int,
     val stageAttemptId: Int) {
 
-  @JsonIgnore @KVIndex
-  def id: Long = info.taskId
+  def hasMetrics: Boolean = executorDeserializeTime >= 0
+
+  def toApi: TaskData = {
+    val metrics = if (hasMetrics) {
+      Some(new TaskMetrics(
+        executorDeserializeTime,
+        executorDeserializeCpuTime,
+        executorRunTime,
+        executorCpuTime,
+        resultSize,
+        jvmGcTime,
+        resultSerializationTime,
+        memoryBytesSpilled,
+        diskBytesSpilled,
+        peakExecutionMemory,
+        new InputMetrics(
+          inputBytesRead,
+          inputRecordsRead),
+        new OutputMetrics(
+          outputBytesWritten,
+          outputRecordsWritten),
+        new ShuffleReadMetrics(
+          shuffleRemoteBlocksFetched,
+          shuffleLocalBlocksFetched,
+          shuffleFetchWaitTime,
+          shuffleRemoteBytesRead,
+          shuffleRemoteBytesReadToDisk,
+          shuffleLocalBytesRead,
+          shuffleRecordsRead),
+        new ShuffleWriteMetrics(
+          shuffleBytesWritten,
+          shuffleWriteTime,
+          shuffleRecordsWritten)))
+    } else {
+      None
+    }
 
-  @JsonIgnore @KVIndex("stage")
-  def stage: Array[Int] = Array(stageId, stageAttemptId)
+    new TaskData(
+      taskId,
+      index,
+      attempt,
+      new Date(launchTime),
+      if (resultFetchStart > 0L) Some(new Date(resultFetchStart)) else None,
+      if (duration > 0L) Some(duration) else None,
+      executorId,
+      host,
+      status,
+      taskLocality,
+      speculative,
+      accumulatorUpdates,
+      errorMessage,
+      metrics)
+  }
+
+  @JsonIgnore @KVIndex(TaskIndexNames.STAGE)
+  private def stage: Array[Int] = Array(stageId, stageAttemptId)
 
-  @JsonIgnore @KVIndex("runtime")
-  def runtime: Array[AnyRef] = {
-    val _runtime = info.taskMetrics.map(_.executorRunTime).getOrElse(-1L)
-    Array(stageId: JInteger, stageAttemptId: JInteger, _runtime: JLong)
+  @JsonIgnore @KVIndex(value = TaskIndexNames.SCHEDULER_DELAY, parent = TaskIndexNames.STAGE)
+  def schedulerDelay: Long = {
+    if (hasMetrics) {
+      AppStatusUtils.schedulerDelay(launchTime, resultFetchStart, duration, executorDeserializeTime,
+        resultSerializationTime, executorRunTime)
+    } else {
+      -1L
+    }
   }
 
-  @JsonIgnore @KVIndex("startTime")
-  def startTime: Array[AnyRef] = {
-    Array(stageId: JInteger, stageAttemptId: JInteger, info.launchTime.getTime(): JLong)
+  @JsonIgnore @KVIndex(value = TaskIndexNames.GETTING_RESULT_TIME, parent = TaskIndexNames.STAGE)
+  def gettingResultTime: Long = {
+    if (hasMetrics) {
+      AppStatusUtils.gettingResultTime(launchTime, resultFetchStart, duration)
+    } else {
+      -1L
+    }
   }
 
-  @JsonIgnore @KVIndex("active")
-  def active: Boolean = info.duration.isEmpty
+  /**
+   * Sorting by accumulators is a little weird, and the previous behavior would generate
+   * insanely long keys in the index. So this implementation just considers the first
+   * accumulator and its String representation.
+   */
+  @JsonIgnore @KVIndex(value = TaskIndexNames.ACCUMULATORS, parent = TaskIndexNames.STAGE)
+  private def accumulators: String = {
+    if (accumulatorUpdates.nonEmpty) {
+      val acc = accumulatorUpdates.head
+      s"${acc.name}:${acc.value}"
+    } else {
+      ""
+    }
+  }
+
+  @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_READS, parent = TaskIndexNames.STAGE)
+  private def shuffleTotalReads: Long = {
+    if (hasMetrics) {
+      shuffleLocalBytesRead + shuffleRemoteBytesRead
+    } else {
+      -1L
+    }
+  }
+
+  @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_BLOCKS, parent = TaskIndexNames.STAGE)
+  private def shuffleTotalBlocks: Long = {
+    if (hasMetrics) {
+      shuffleLocalBlocksFetched + shuffleRemoteBlocksFetched
+    } else {
+      -1L
+    }
+  }
+
+  @JsonIgnore @KVIndex(value = TaskIndexNames.ERROR, parent = TaskIndexNames.STAGE)
+  private def error: String = if (errorMessage.isDefined) errorMessage.get else ""
 
 }
 
@@ -134,10 +356,13 @@ private[spark] class ExecutorStageSummaryWrapper(
     val info: ExecutorStageSummary) {
 
   @JsonIgnore @KVIndex
-  val id: Array[Any] = Array(stageId, stageAttemptId, executorId)
+  private val _id: Array[Any] = Array(stageId, stageAttemptId, executorId)
 
   @JsonIgnore @KVIndex("stage")
-  private[this] val stage: Array[Int] = Array(stageId, stageAttemptId)
+  private def stage: Array[Int] = Array(stageId, stageAttemptId)
+
+  @JsonIgnore
+  def id: Array[Any] = _id
 
 }
 
@@ -203,3 +428,53 @@ private[spark] class AppSummary(
   def id: String = classOf[AppSummary].getName()
 
 }
+
+/**
+ * A cached view of a specific quantile for one stage attempt's metrics.
+ */
+private[spark] class CachedQuantile(
+    val stageId: Int,
+    val stageAttemptId: Int,
+    val quantile: String,
+    val taskCount: Long,
+
+    // The following fields are an exploded view of a single entry for TaskMetricDistributions.
+    val executorDeserializeTime: Double,
+    val executorDeserializeCpuTime: Double,
+    val executorRunTime: Double,
+    val executorCpuTime: Double,
+    val resultSize: Double,
+    val jvmGcTime: Double,
+    val resultSerializationTime: Double,
+    val gettingResultTime: Double,
+    val schedulerDelay: Double,
+    val peakExecutionMemory: Double,
+    val memoryBytesSpilled: Double,
+    val diskBytesSpilled: Double,
+
+    val bytesRead: Double,
+    val recordsRead: Double,
+
+    val bytesWritten: Double,
+    val recordsWritten: Double,
+
+    val shuffleReadBytes: Double,
+    val shuffleRecordsRead: Double,
+    val shuffleRemoteBlocksFetched: Double,
+    val shuffleLocalBlocksFetched: Double,
+    val shuffleFetchWaitTime: Double,
+    val shuffleRemoteBytesRead: Double,
+    val shuffleRemoteBytesReadToDisk: Double,
+    val shuffleTotalBlocksFetched: Double,
+
+    val shuffleWriteBytes: Double,
+    val shuffleWriteRecords: Double,
+    val shuffleWriteTime: Double) {
+
+  @KVIndex @JsonIgnore
+  def id: Array[Any] = Array(stageId, stageAttemptId, quantile)
+
+  @KVIndex("stage") @JsonIgnore
+  def stage: Array[Int] = Array(stageId, stageAttemptId)
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 41d42b5..95c12b1 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -87,7 +87,9 @@ private[ui] class ExecutorTable(stage: StageData, store: AppStatusStore) {
   }
 
   private def createExecutorTable(stage: StageData) : Seq[Node] = {
-    stage.executorSummary.getOrElse(Map.empty).toSeq.sortBy(_._1).map { case (k, v) =>
+    val executorSummary = store.executorSummary(stage.stageId, stage.attemptId)
+
+    executorSummary.toSeq.sortBy(_._1).map { case (k, v) =>
       val executor = store.asOption(store.executorSummary(k))
       <tr>
         <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/1c70da3b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 740f12e7..bf59152 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -201,7 +201,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
     val stages = jobData.stageIds.map { stageId =>
       // This could be empty if the listener hasn't received information about the
       // stage or if the stage information has been garbage collected
-      store.stageData(stageId).lastOption.getOrElse {
+      store.asOption(store.lastStageAttempt(stageId)).getOrElse {
         new v1.StageData(
           v1.StageStatus.PENDING,
           stageId,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org