You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2017/10/26 16:06:15 UTC

spark git commit: [SPARK-20643][CORE] Add listener implementation to collect app state.

Repository: spark
Updated Branches:
  refs/heads/master a83d8d5ad -> 0e9a750a8


[SPARK-20643][CORE] Add listener implementation to collect app state.

The initial listener code is based on the existing JobProgressListener (and others),
and tries to mimic their behavior as much as possible. The change also includes
some minor code movement so that some types and methods from the initial history
server code code can be reused.

The code introduces a few mutable versions of public API types, used internally,
to make it easier to update information without ugly copy methods, and also to
make certain updates cheaper.

Note the code here is not 100% correct. This is meant as a building ground for
the UI integration in the next milestones. As different parts of the UI are
ported, fixes will be made to the different parts of this code to account
for the needed behavior.

I also added annotations to API types so that Jackson is able to correctly
deserialize options, sequences and maps that store primitive types.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #19383 from vanzin/SPARK-20643.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e9a750a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e9a750a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e9a750a

Branch: refs/heads/master
Commit: 0e9a750a8d389b3a17834584d31c204c77c6970d
Parents: a83d8d5
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Thu Oct 26 11:05:16 2017 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Thu Oct 26 11:05:16 2017 -0500

----------------------------------------------------------------------
 .../apache/spark/util/kvstore/KVTypeInfo.java   |   2 +
 .../org/apache/spark/util/kvstore/LevelDB.java  |   2 +-
 .../apache/spark/status/api/v1/StageStatus.java |   3 +-
 .../deploy/history/FsHistoryProvider.scala      |  37 +-
 .../apache/spark/deploy/history/config.scala    |   6 -
 .../apache/spark/status/AppStatusListener.scala | 531 ++++++++++++++
 .../scala/org/apache/spark/status/KVUtils.scala |  73 ++
 .../org/apache/spark/status/LiveEntity.scala    | 526 ++++++++++++++
 .../spark/status/api/v1/AllStagesResource.scala |   4 +-
 .../org/apache/spark/status/api/v1/api.scala    |  11 +-
 .../org/apache/spark/status/storeTypes.scala    |  98 +++
 .../deploy/history/FsHistoryProviderSuite.scala |   2 +-
 .../spark/status/AppStatusListenerSuite.scala   | 690 +++++++++++++++++++
 project/MimaExcludes.scala                      |   2 +
 14 files changed, 1942 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
index a2b077e..870b484 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVTypeInfo.java
@@ -46,6 +46,7 @@ public class KVTypeInfo {
       KVIndex idx = f.getAnnotation(KVIndex.class);
       if (idx != null) {
         checkIndex(idx, indices);
+        f.setAccessible(true);
         indices.put(idx.value(), idx);
         f.setAccessible(true);
         accessors.put(idx.value(), new FieldAccessor(f));
@@ -58,6 +59,7 @@ public class KVTypeInfo {
         checkIndex(idx, indices);
         Preconditions.checkArgument(m.getParameterTypes().length == 0,
           "Annotated method %s::%s should not have any parameters.", type.getName(), m.getName());
+        m.setAccessible(true);
         indices.put(idx.value(), idx);
         m.setAccessible(true);
         accessors.put(idx.value(), new MethodAccessor(m));

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
----------------------------------------------------------------------
diff --git a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
index ff48b15..4f9e10c 100644
--- a/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
+++ b/common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java
@@ -76,7 +76,7 @@ public class LevelDB implements KVStore {
     this.types = new ConcurrentHashMap<>();
 
     Options options = new Options();
-    options.createIfMissing(!path.exists());
+    options.createIfMissing(true);
     this._db = new AtomicReference<>(JniDBFactory.factory.open(path, options));
 
     byte[] versionData = db().get(STORE_VERSION_KEY);

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java b/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java
index 9dbb565..40b5f62 100644
--- a/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java
+++ b/core/src/main/java/org/apache/spark/status/api/v1/StageStatus.java
@@ -23,7 +23,8 @@ public enum StageStatus {
   ACTIVE,
   COMPLETE,
   FAILED,
-  PENDING;
+  PENDING,
+  SKIPPED;
 
   public static StageStatus fromString(String str) {
     return EnumUtil.parseIgnoreCase(StageStatus.class, str);

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 3889dd0..cf97597 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -42,6 +42,7 @@ import org.apache.spark.deploy.history.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.ReplayListenerBus._
+import org.apache.spark.status.KVUtils._
 import org.apache.spark.status.api.v1
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
@@ -129,29 +130,15 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   // Visible for testing.
   private[history] val listing: KVStore = storePath.map { path =>
     val dbPath = new File(path, "listing.ldb")
-
-    def openDB(): LevelDB = new LevelDB(dbPath, new KVStoreScalaSerializer())
+    val metadata = new FsHistoryProviderMetadata(CURRENT_LISTING_VERSION, logDir.toString())
 
     try {
-      val db = openDB()
-      val meta = db.getMetadata(classOf[KVStoreMetadata])
-
-      if (meta == null) {
-        db.setMetadata(new KVStoreMetadata(CURRENT_LISTING_VERSION, logDir))
-        db
-      } else if (meta.version != CURRENT_LISTING_VERSION || !logDir.equals(meta.logDir)) {
-        logInfo("Detected mismatched config in existing DB, deleting...")
-        db.close()
-        Utils.deleteRecursively(dbPath)
-        openDB()
-      } else {
-        db
-      }
+      open(new File(path, "listing.ldb"), metadata)
     } catch {
-      case _: UnsupportedStoreVersionException =>
+      case _: UnsupportedStoreVersionException | _: MetadataMismatchException =>
         logInfo("Detected incompatible DB versions, deleting...")
         Utils.deleteRecursively(dbPath)
-        openDB()
+        open(new File(path, "listing.ldb"), metadata)
     }
   }.getOrElse(new InMemoryStore())
 
@@ -720,19 +707,7 @@ private[history] object FsHistoryProvider {
   private[history] val CURRENT_LISTING_VERSION = 1L
 }
 
-/**
- * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as
- * the API serializer.
- */
-private class KVStoreScalaSerializer extends KVStoreSerializer {
-
-  mapper.registerModule(DefaultScalaModule)
-  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
-  mapper.setDateFormat(v1.JacksonMessageWriter.makeISODateFormat)
-
-}
-
-private[history] case class KVStoreMetadata(
+private[history] case class FsHistoryProviderMetadata(
   version: Long,
   logDir: String)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/core/src/main/scala/org/apache/spark/deploy/history/config.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/config.scala b/core/src/main/scala/org/apache/spark/deploy/history/config.scala
index fb9e997..52dedc1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/config.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/config.scala
@@ -19,16 +19,10 @@ package org.apache.spark.deploy.history
 
 import java.util.concurrent.TimeUnit
 
-import scala.annotation.meta.getter
-
 import org.apache.spark.internal.config.ConfigBuilder
-import org.apache.spark.util.kvstore.KVIndex
 
 private[spark] object config {
 
-  /** Use this to annotate constructor params to be used as KVStore indices. */
-  type KVIndexParam = KVIndex @getter
-
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
 
   val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
new file mode 100644
index 0000000..f120685
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.Date
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler._
+import org.apache.spark.status.api.v1
+import org.apache.spark.storage._
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * A Spark listener that writes application information to a data store. The types written to the
+ * store are defined in the `storeTypes.scala` file and are based on the public REST API.
+ */
+private class AppStatusListener(kvstore: KVStore) extends SparkListener with Logging {
+
+  private var sparkVersion = SPARK_VERSION
+  private var appInfo: v1.ApplicationInfo = null
+  private var coresPerTask: Int = 1
+
+  // Keep track of live entities, so that task metrics can be efficiently updated (without
+  // causing too many writes to the underlying store, and other expensive operations).
+  private val liveStages = new HashMap[(Int, Int), LiveStage]()
+  private val liveJobs = new HashMap[Int, LiveJob]()
+  private val liveExecutors = new HashMap[String, LiveExecutor]()
+  private val liveTasks = new HashMap[Long, LiveTask]()
+  private val liveRDDs = new HashMap[Int, LiveRDD]()
+
+  override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+    case SparkListenerLogStart(version) => sparkVersion = version
+    case _ =>
+  }
+
+  override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
+    assert(event.appId.isDefined, "Application without IDs are not supported.")
+
+    val attempt = new v1.ApplicationAttemptInfo(
+      event.appAttemptId,
+      new Date(event.time),
+      new Date(-1),
+      new Date(event.time),
+      -1L,
+      event.sparkUser,
+      false,
+      sparkVersion)
+
+    appInfo = new v1.ApplicationInfo(
+      event.appId.get,
+      event.appName,
+      None,
+      None,
+      None,
+      None,
+      Seq(attempt))
+
+    kvstore.write(new ApplicationInfoWrapper(appInfo))
+  }
+
+  override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
+    val old = appInfo.attempts.head
+    val attempt = new v1.ApplicationAttemptInfo(
+      old.attemptId,
+      old.startTime,
+      new Date(event.time),
+      new Date(event.time),
+      event.time - old.startTime.getTime(),
+      old.sparkUser,
+      true,
+      old.appSparkVersion)
+
+    appInfo = new v1.ApplicationInfo(
+      appInfo.id,
+      appInfo.name,
+      None,
+      None,
+      None,
+      None,
+      Seq(attempt))
+    kvstore.write(new ApplicationInfoWrapper(appInfo))
+  }
+
+  override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
+    // This needs to be an update in case an executor re-registers after the driver has
+    // marked it as "dead".
+    val exec = getOrCreateExecutor(event.executorId)
+    exec.host = event.executorInfo.executorHost
+    exec.isActive = true
+    exec.totalCores = event.executorInfo.totalCores
+    exec.maxTasks = event.executorInfo.totalCores / coresPerTask
+    exec.executorLogs = event.executorInfo.logUrlMap
+    update(exec)
+  }
+
+  override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
+    liveExecutors.remove(event.executorId).foreach { exec =>
+      exec.isActive = false
+      update(exec)
+    }
+  }
+
+  override def onExecutorBlacklisted(event: SparkListenerExecutorBlacklisted): Unit = {
+    updateBlackListStatus(event.executorId, true)
+  }
+
+  override def onExecutorUnblacklisted(event: SparkListenerExecutorUnblacklisted): Unit = {
+    updateBlackListStatus(event.executorId, false)
+  }
+
+  override def onNodeBlacklisted(event: SparkListenerNodeBlacklisted): Unit = {
+    updateNodeBlackList(event.hostId, true)
+  }
+
+  override def onNodeUnblacklisted(event: SparkListenerNodeUnblacklisted): Unit = {
+    updateNodeBlackList(event.hostId, false)
+  }
+
+  private def updateBlackListStatus(execId: String, blacklisted: Boolean): Unit = {
+    liveExecutors.get(execId).foreach { exec =>
+      exec.isBlacklisted = blacklisted
+      update(exec)
+    }
+  }
+
+  private def updateNodeBlackList(host: String, blacklisted: Boolean): Unit = {
+    // Implicitly (un)blacklist every executor associated with the node.
+    liveExecutors.values.foreach { exec =>
+      if (exec.hostname == host) {
+        exec.isBlacklisted = blacklisted
+        update(exec)
+      }
+    }
+  }
+
+  override def onJobStart(event: SparkListenerJobStart): Unit = {
+    // Compute (a potential over-estimate of) the number of tasks that will be run by this job.
+    // This may be an over-estimate because the job start event references all of the result
+    // stages' transitive stage dependencies, but some of these stages might be skipped if their
+    // output is available from earlier runs.
+    // See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
+    val numTasks = {
+      val missingStages = event.stageInfos.filter(_.completionTime.isEmpty)
+      missingStages.map(_.numTasks).sum
+    }
+
+    val lastStageInfo = event.stageInfos.lastOption
+    val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+
+    val jobGroup = Option(event.properties)
+      .flatMap { p => Option(p.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) }
+
+    val job = new LiveJob(
+      event.jobId,
+      lastStageName,
+      Some(new Date(event.time)),
+      event.stageIds,
+      jobGroup,
+      numTasks)
+    liveJobs.put(event.jobId, job)
+    update(job)
+
+    event.stageInfos.foreach { stageInfo =>
+      // A new job submission may re-use an existing stage, so this code needs to do an update
+      // instead of just a write.
+      val stage = getOrCreateStage(stageInfo)
+      stage.jobs :+= job
+      stage.jobIds += event.jobId
+      update(stage)
+    }
+  }
+
+  override def onJobEnd(event: SparkListenerJobEnd): Unit = {
+    liveJobs.remove(event.jobId).foreach { job =>
+      job.status = event.jobResult match {
+        case JobSucceeded => JobExecutionStatus.SUCCEEDED
+        case JobFailed(_) => JobExecutionStatus.FAILED
+      }
+
+      job.completionTime = Some(new Date(event.time))
+      update(job)
+    }
+  }
+
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
+    val stage = getOrCreateStage(event.stageInfo)
+    stage.status = v1.StageStatus.ACTIVE
+    stage.schedulingPool = Option(event.properties).flatMap { p =>
+      Option(p.getProperty("spark.scheduler.pool"))
+    }.getOrElse(SparkUI.DEFAULT_POOL_NAME)
+
+    // Look at all active jobs to find the ones that mention this stage.
+    stage.jobs = liveJobs.values
+      .filter(_.stageIds.contains(event.stageInfo.stageId))
+      .toSeq
+    stage.jobIds = stage.jobs.map(_.jobId).toSet
+
+    stage.jobs.foreach { job =>
+      job.completedStages = job.completedStages - event.stageInfo.stageId
+      job.activeStages += 1
+      update(job)
+    }
+
+    event.stageInfo.rddInfos.foreach { info =>
+      if (info.storageLevel.isValid) {
+        update(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)))
+      }
+    }
+
+    update(stage)
+  }
+
+  override def onTaskStart(event: SparkListenerTaskStart): Unit = {
+    val task = new LiveTask(event.taskInfo, event.stageId, event.stageAttemptId)
+    liveTasks.put(event.taskInfo.taskId, task)
+    update(task)
+
+    liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage =>
+      stage.activeTasks += 1
+      stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime)
+      update(stage)
+
+      stage.jobs.foreach { job =>
+        job.activeTasks += 1
+        update(job)
+      }
+    }
+
+    liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
+      exec.activeTasks += 1
+      exec.totalTasks += 1
+      update(exec)
+    }
+  }
+
+  override def onTaskGettingResult(event: SparkListenerTaskGettingResult): Unit = {
+    // Call update on the task so that the "getting result" time is written to the store; the
+    // value is part of the mutable TaskInfo state that the live entity already references.
+    liveTasks.get(event.taskInfo.taskId).foreach { task =>
+      update(task)
+    }
+  }
+
+  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+    // TODO: can this really happen?
+    if (event.taskInfo == null) {
+      return
+    }
+
+    val metricsDelta = liveTasks.remove(event.taskInfo.taskId).map { task =>
+      val errorMessage = event.reason match {
+        case Success =>
+          None
+        case k: TaskKilled =>
+          Some(k.reason)
+        case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates
+          Some(e.toErrorString)
+        case e: TaskFailedReason => // All other failure cases
+          Some(e.toErrorString)
+        case other =>
+          logInfo(s"Unhandled task end reason: $other")
+          None
+      }
+      task.errorMessage = errorMessage
+      val delta = task.updateMetrics(event.taskMetrics)
+      update(task)
+      delta
+    }.orNull
+
+    val (completedDelta, failedDelta) = event.reason match {
+      case Success =>
+        (1, 0)
+      case _ =>
+        (0, 1)
+    }
+
+    liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage =>
+      if (metricsDelta != null) {
+        stage.metrics.update(metricsDelta)
+      }
+      stage.activeTasks -= 1
+      stage.completedTasks += completedDelta
+      stage.failedTasks += failedDelta
+      update(stage)
+
+      stage.jobs.foreach { job =>
+        job.activeTasks -= 1
+        job.completedTasks += completedDelta
+        job.failedTasks += failedDelta
+        update(job)
+      }
+
+      val esummary = stage.executorSummary(event.taskInfo.executorId)
+      esummary.taskTime += event.taskInfo.duration
+      esummary.succeededTasks += completedDelta
+      esummary.failedTasks += failedDelta
+      if (metricsDelta != null) {
+        esummary.metrics.update(metricsDelta)
+      }
+      update(esummary)
+    }
+
+    liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
+      if (event.taskMetrics != null) {
+        val readMetrics = event.taskMetrics.shuffleReadMetrics
+        exec.totalGcTime += event.taskMetrics.jvmGCTime
+        exec.totalInputBytes += event.taskMetrics.inputMetrics.bytesRead
+        exec.totalShuffleRead += readMetrics.localBytesRead + readMetrics.remoteBytesRead
+        exec.totalShuffleWrite += event.taskMetrics.shuffleWriteMetrics.bytesWritten
+      }
+
+      exec.activeTasks -= 1
+      exec.completedTasks += completedDelta
+      exec.failedTasks += failedDelta
+      exec.totalDuration += event.taskInfo.duration
+      update(exec)
+    }
+  }
+
+  override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
+    liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptId)).foreach { stage =>
+      stage.info = event.stageInfo
+
+      // Because of SPARK-20205, old event logs may contain valid stages without a submission time
+      // in their start event. In those cases, we can only detect whether a stage was skipped by
+      // waiting until the completion event, at which point the field would have been set.
+      stage.status = event.stageInfo.failureReason match {
+        case Some(_) => v1.StageStatus.FAILED
+        case _ if event.stageInfo.submissionTime.isDefined => v1.StageStatus.COMPLETE
+        case _ => v1.StageStatus.SKIPPED
+      }
+      update(stage)
+
+      stage.jobs.foreach { job =>
+        stage.status match {
+          case v1.StageStatus.COMPLETE =>
+            job.completedStages += event.stageInfo.stageId
+          case v1.StageStatus.SKIPPED =>
+            job.skippedStages += event.stageInfo.stageId
+            job.skippedTasks += event.stageInfo.numTasks
+          case _ =>
+            job.failedStages += 1
+        }
+        job.activeStages -= 1
+        update(job)
+      }
+
+      stage.executorSummaries.values.foreach(update)
+      update(stage)
+    }
+  }
+
+  override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
+    // This needs to set fields that are already set by onExecutorAdded because the driver is
+    // considered an "executor" in the UI, but does not have a SparkListenerExecutorAdded event.
+    val exec = getOrCreateExecutor(event.blockManagerId.executorId)
+    exec.hostPort = event.blockManagerId.hostPort
+    event.maxOnHeapMem.foreach { _ =>
+      exec.totalOnHeap = event.maxOnHeapMem.get
+      exec.totalOffHeap = event.maxOffHeapMem.get
+    }
+    exec.isActive = true
+    exec.maxMemory = event.maxMem
+    update(exec)
+  }
+
+  override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved): Unit = {
+    // Nothing to do here. Covered by onExecutorRemoved.
+  }
+
+  override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
+    liveRDDs.remove(event.rddId)
+    kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId)
+  }
+
+  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
+    event.accumUpdates.foreach { case (taskId, sid, sAttempt, accumUpdates) =>
+      liveTasks.get(taskId).foreach { task =>
+        val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates)
+        val delta = task.updateMetrics(metrics)
+        update(task)
+
+        liveStages.get((sid, sAttempt)).foreach { stage =>
+          stage.metrics.update(delta)
+          update(stage)
+
+          val esummary = stage.executorSummary(event.execId)
+          esummary.metrics.update(delta)
+          update(esummary)
+        }
+      }
+    }
+  }
+
+  override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
+    event.blockUpdatedInfo.blockId match {
+      case block: RDDBlockId => updateRDDBlock(event, block)
+      case _ => // TODO: API only covers RDD storage.
+    }
+  }
+
+  private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = {
+    val executorId = event.blockUpdatedInfo.blockManagerId.executorId
+
+    // Whether values are being added to or removed from the existing accounting.
+    val storageLevel = event.blockUpdatedInfo.storageLevel
+    val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1)
+    val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1)
+
+    // Function to apply a delta to a value, but ensure that it doesn't go negative.
+    def newValue(old: Long, delta: Long): Long = math.max(0, old + delta)
+
+    val updatedStorageLevel = if (storageLevel.isValid) {
+      Some(storageLevel.description)
+    } else {
+      None
+    }
+
+    // We need information about the executor to update some memory accounting values in the
+    // RDD info, so read that beforehand.
+    val maybeExec = liveExecutors.get(executorId)
+    var rddBlocksDelta = 0
+
+    // Update the block entry in the RDD info, keeping track of the deltas above so that we
+    // can update the executor information too.
+    liveRDDs.get(block.rddId).foreach { rdd =>
+      val partition = rdd.partition(block.name)
+
+      val executors = if (updatedStorageLevel.isDefined) {
+        if (!partition.executors.contains(executorId)) {
+          rddBlocksDelta = 1
+        }
+        partition.executors + executorId
+      } else {
+        rddBlocksDelta = -1
+        partition.executors - executorId
+      }
+
+      // Only update the partition if it's still stored in some executor, otherwise get rid of it.
+      if (executors.nonEmpty) {
+        if (updatedStorageLevel.isDefined) {
+          partition.storageLevel = updatedStorageLevel.get
+        }
+        partition.memoryUsed = newValue(partition.memoryUsed, memoryDelta)
+        partition.diskUsed = newValue(partition.diskUsed, diskDelta)
+        partition.executors = executors
+      } else {
+        rdd.removePartition(block.name)
+      }
+
+      maybeExec.foreach { exec =>
+        if (exec.rddBlocks + rddBlocksDelta > 0) {
+          val dist = rdd.distribution(exec)
+          dist.memoryRemaining = newValue(dist.memoryRemaining, -memoryDelta)
+          dist.memoryUsed = newValue(dist.memoryUsed, memoryDelta)
+          dist.diskUsed = newValue(dist.diskUsed, diskDelta)
+
+          if (exec.hasMemoryInfo) {
+            if (storageLevel.useOffHeap) {
+              dist.offHeapUsed = newValue(dist.offHeapUsed, memoryDelta)
+              dist.offHeapRemaining = newValue(dist.offHeapRemaining, -memoryDelta)
+            } else {
+              dist.onHeapUsed = newValue(dist.onHeapUsed, memoryDelta)
+              dist.onHeapRemaining = newValue(dist.onHeapRemaining, -memoryDelta)
+            }
+          }
+        } else {
+          rdd.removeDistribution(exec)
+        }
+      }
+
+      if (updatedStorageLevel.isDefined) {
+        rdd.storageLevel = updatedStorageLevel.get
+      }
+      rdd.memoryUsed = newValue(rdd.memoryUsed, memoryDelta)
+      rdd.diskUsed = newValue(rdd.diskUsed, diskDelta)
+      update(rdd)
+    }
+
+    maybeExec.foreach { exec =>
+      if (exec.hasMemoryInfo) {
+        if (storageLevel.useOffHeap) {
+          exec.usedOffHeap = newValue(exec.usedOffHeap, memoryDelta)
+        } else {
+          exec.usedOnHeap = newValue(exec.usedOnHeap, memoryDelta)
+        }
+      }
+      exec.memoryUsed = newValue(exec.memoryUsed, memoryDelta)
+      exec.diskUsed = newValue(exec.diskUsed, diskDelta)
+      exec.rddBlocks += rddBlocksDelta
+      if (exec.hasMemoryInfo || rddBlocksDelta != 0) {
+        update(exec)
+      }
+    }
+  }
+
+  private def getOrCreateExecutor(executorId: String): LiveExecutor = {
+    liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId))
+  }
+
+  private def getOrCreateStage(info: StageInfo): LiveStage = {
+    val stage = liveStages.getOrElseUpdate((info.stageId, info.attemptId), new LiveStage())
+    stage.info = info
+    stage
+  }
+
+  private def update(entity: LiveEntity): Unit = {
+    entity.write(kvstore)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/core/src/main/scala/org/apache/spark/status/KVUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/KVUtils.scala b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
new file mode 100644
index 0000000..4638511
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/KVUtils.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.io.File
+
+import scala.annotation.meta.getter
+import scala.language.implicitConversions
+import scala.reflect.{classTag, ClassTag}
+
+import com.fasterxml.jackson.annotation.JsonInclude
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.kvstore._
+
+private[spark] object KVUtils extends Logging {
+
+  /** Use this to annotate constructor params to be used as KVStore indices. */
+  type KVIndexParam = KVIndex @getter
+
+  /**
+   * A KVStoreSerializer that provides Scala types serialization too, and uses the same options as
+   * the API serializer.
+   */
+  private[spark] class KVStoreScalaSerializer extends KVStoreSerializer {
+
+    mapper.registerModule(DefaultScalaModule)
+    mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
+
+  }
+
+  /**
+   * Open or create a LevelDB store.
+   *
+   * @param path Location of the store.
+   * @param metadata Metadata value to compare to the data in the store. If the store does not
+   *                 contain any metadata (e.g. it's a new store), this value is written as
+   *                 the store's metadata.
+   */
+  def open[M: ClassTag](path: File, metadata: M): LevelDB = {
+    require(metadata != null, "Metadata is required.")
+
+    val db = new LevelDB(path, new KVStoreScalaSerializer())
+    val dbMeta = db.getMetadata(classTag[M].runtimeClass)
+    if (dbMeta == null) {
+      db.setMetadata(metadata)
+    } else if (dbMeta != metadata) {
+      db.close()
+      throw new MetadataMismatchException()
+    }
+
+    db
+  }
+
+  private[spark] class MetadataMismatchException extends Exception
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
new file mode 100644
index 0000000..63fa365
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -0,0 +1,526 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.util.Date
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
+import org.apache.spark.status.api.v1
+import org.apache.spark.storage.RDDInfo
+import org.apache.spark.ui.SparkUI
+import org.apache.spark.util.AccumulatorContext
+import org.apache.spark.util.kvstore.KVStore
+
+/**
+ * A mutable representation of a live entity in Spark (jobs, stages, tasks, et al). Every live
+ * entity uses one of these instances to keep track of their evolving state, and periodically
+ * flush an immutable view of the entity to the app state store.
+ */
+private[spark] abstract class LiveEntity {
+
+  def write(store: KVStore): Unit = {
+    store.write(doUpdate())
+  }
+
+  /**
+   * Returns an updated view of entity data, to be stored in the status store, reflecting the
+   * latest information collected by the listener.
+   */
+  protected def doUpdate(): Any
+
+}
+
+private class LiveJob(
+    val jobId: Int,
+    name: String,
+    submissionTime: Option[Date],
+    val stageIds: Seq[Int],
+    jobGroup: Option[String],
+    numTasks: Int) extends LiveEntity {
+
+  var activeTasks = 0
+  var completedTasks = 0
+  var failedTasks = 0
+
+  var skippedTasks = 0
+  var skippedStages = Set[Int]()
+
+  var status = JobExecutionStatus.RUNNING
+  var completionTime: Option[Date] = None
+
+  var completedStages: Set[Int] = Set()
+  var activeStages = 0
+  var failedStages = 0
+
+  override protected def doUpdate(): Any = {
+    val info = new v1.JobData(
+      jobId,
+      name,
+      None, // description is always None?
+      submissionTime,
+      completionTime,
+      stageIds,
+      jobGroup,
+      status,
+      numTasks,
+      activeTasks,
+      completedTasks,
+      skippedTasks,
+      failedTasks,
+      activeStages,
+      completedStages.size,
+      skippedStages.size,
+      failedStages)
+    new JobDataWrapper(info, skippedStages)
+  }
+
+}
+
+private class LiveTask(
+    info: TaskInfo,
+    stageId: Int,
+    stageAttemptId: Int) extends LiveEntity {
+
+  import LiveEntityHelpers._
+
+  private var recordedMetrics: v1.TaskMetrics = null
+
+  var errorMessage: Option[String] = None
+
+  /**
+   * Update the metrics for the task and return the difference between the previous and new
+   * values.
+   */
+  def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = {
+    if (metrics != null) {
+      val old = recordedMetrics
+      recordedMetrics = new v1.TaskMetrics(
+        metrics.executorDeserializeTime,
+        metrics.executorDeserializeCpuTime,
+        metrics.executorRunTime,
+        metrics.executorCpuTime,
+        metrics.resultSize,
+        metrics.jvmGCTime,
+        metrics.resultSerializationTime,
+        metrics.memoryBytesSpilled,
+        metrics.diskBytesSpilled,
+        new v1.InputMetrics(
+          metrics.inputMetrics.bytesRead,
+          metrics.inputMetrics.recordsRead),
+        new v1.OutputMetrics(
+          metrics.outputMetrics.bytesWritten,
+          metrics.outputMetrics.recordsWritten),
+        new v1.ShuffleReadMetrics(
+          metrics.shuffleReadMetrics.remoteBlocksFetched,
+          metrics.shuffleReadMetrics.localBlocksFetched,
+          metrics.shuffleReadMetrics.fetchWaitTime,
+          metrics.shuffleReadMetrics.remoteBytesRead,
+          metrics.shuffleReadMetrics.remoteBytesReadToDisk,
+          metrics.shuffleReadMetrics.localBytesRead,
+          metrics.shuffleReadMetrics.recordsRead),
+        new v1.ShuffleWriteMetrics(
+          metrics.shuffleWriteMetrics.bytesWritten,
+          metrics.shuffleWriteMetrics.writeTime,
+          metrics.shuffleWriteMetrics.recordsWritten))
+      if (old != null) calculateMetricsDelta(recordedMetrics, old) else recordedMetrics
+    } else {
+      null
+    }
+  }
+
+  /**
+   * Return a new TaskMetrics object containing the delta of the various fields of the given
+   * metrics objects. This is currently targeted at updating stage data, so it does not
+   * necessarily calculate deltas for all the fields.
+   */
+  private def calculateMetricsDelta(
+      metrics: v1.TaskMetrics,
+      old: v1.TaskMetrics): v1.TaskMetrics = {
+    val shuffleWriteDelta = new v1.ShuffleWriteMetrics(
+      metrics.shuffleWriteMetrics.bytesWritten - old.shuffleWriteMetrics.bytesWritten,
+      0L,
+      metrics.shuffleWriteMetrics.recordsWritten - old.shuffleWriteMetrics.recordsWritten)
+
+    val shuffleReadDelta = new v1.ShuffleReadMetrics(
+      0L, 0L, 0L,
+      metrics.shuffleReadMetrics.remoteBytesRead - old.shuffleReadMetrics.remoteBytesRead,
+      metrics.shuffleReadMetrics.remoteBytesReadToDisk -
+        old.shuffleReadMetrics.remoteBytesReadToDisk,
+      metrics.shuffleReadMetrics.localBytesRead - old.shuffleReadMetrics.localBytesRead,
+      metrics.shuffleReadMetrics.recordsRead - old.shuffleReadMetrics.recordsRead)
+
+    val inputDelta = new v1.InputMetrics(
+      metrics.inputMetrics.bytesRead - old.inputMetrics.bytesRead,
+      metrics.inputMetrics.recordsRead - old.inputMetrics.recordsRead)
+
+    val outputDelta = new v1.OutputMetrics(
+      metrics.outputMetrics.bytesWritten - old.outputMetrics.bytesWritten,
+      metrics.outputMetrics.recordsWritten - old.outputMetrics.recordsWritten)
+
+    new v1.TaskMetrics(
+      0L, 0L,
+      metrics.executorRunTime - old.executorRunTime,
+      metrics.executorCpuTime - old.executorCpuTime,
+      0L, 0L, 0L,
+      metrics.memoryBytesSpilled - old.memoryBytesSpilled,
+      metrics.diskBytesSpilled - old.diskBytesSpilled,
+      inputDelta,
+      outputDelta,
+      shuffleReadDelta,
+      shuffleWriteDelta)
+  }
+
+  override protected def doUpdate(): Any = {
+    val task = new v1.TaskData(
+      info.taskId,
+      info.index,
+      info.attemptNumber,
+      new Date(info.launchTime),
+      if (info.finished) Some(info.duration) else None,
+      info.executorId,
+      info.host,
+      info.status,
+      info.taskLocality.toString(),
+      info.speculative,
+      newAccumulatorInfos(info.accumulables),
+      errorMessage,
+      Option(recordedMetrics))
+    new TaskDataWrapper(task)
+  }
+
+}
+
+private class LiveExecutor(val executorId: String) extends LiveEntity {
+
+  var hostPort: String = null
+  var host: String = null
+  var isActive = true
+  var totalCores = 0
+
+  var rddBlocks = 0
+  var memoryUsed = 0L
+  var diskUsed = 0L
+  var maxTasks = 0
+  var maxMemory = 0L
+
+  var totalTasks = 0
+  var activeTasks = 0
+  var completedTasks = 0
+  var failedTasks = 0
+  var totalDuration = 0L
+  var totalGcTime = 0L
+  var totalInputBytes = 0L
+  var totalShuffleRead = 0L
+  var totalShuffleWrite = 0L
+  var isBlacklisted = false
+
+  var executorLogs = Map[String, String]()
+
+  // Memory metrics. They may not be recorded (e.g. old event logs) so if totalOnHeap is not
+  // initialized, the store will not contain this information.
+  var totalOnHeap = -1L
+  var totalOffHeap = 0L
+  var usedOnHeap = 0L
+  var usedOffHeap = 0L
+
+  def hasMemoryInfo: Boolean = totalOnHeap >= 0L
+
+  def hostname: String = if (host != null) host else hostPort.split(":")(0)
+
+  override protected def doUpdate(): Any = {
+    val memoryMetrics = if (totalOnHeap >= 0) {
+      Some(new v1.MemoryMetrics(usedOnHeap, usedOffHeap, totalOnHeap, totalOffHeap))
+    } else {
+      None
+    }
+
+    val info = new v1.ExecutorSummary(
+      executorId,
+      if (hostPort != null) hostPort else host,
+      isActive,
+      rddBlocks,
+      memoryUsed,
+      diskUsed,
+      totalCores,
+      maxTasks,
+      activeTasks,
+      failedTasks,
+      completedTasks,
+      totalTasks,
+      totalDuration,
+      totalGcTime,
+      totalInputBytes,
+      totalShuffleRead,
+      totalShuffleWrite,
+      isBlacklisted,
+      maxMemory,
+      executorLogs,
+      memoryMetrics)
+    new ExecutorSummaryWrapper(info)
+  }
+
+}
+
+/** Metrics tracked per stage (both total and per executor). */
+private class MetricsTracker {
+  var executorRunTime = 0L
+  var executorCpuTime = 0L
+  var inputBytes = 0L
+  var inputRecords = 0L
+  var outputBytes = 0L
+  var outputRecords = 0L
+  var shuffleReadBytes = 0L
+  var shuffleReadRecords = 0L
+  var shuffleWriteBytes = 0L
+  var shuffleWriteRecords = 0L
+  var memoryBytesSpilled = 0L
+  var diskBytesSpilled = 0L
+
+  def update(delta: v1.TaskMetrics): Unit = {
+    executorRunTime += delta.executorRunTime
+    executorCpuTime += delta.executorCpuTime
+    inputBytes += delta.inputMetrics.bytesRead
+    inputRecords += delta.inputMetrics.recordsRead
+    outputBytes += delta.outputMetrics.bytesWritten
+    outputRecords += delta.outputMetrics.recordsWritten
+    shuffleReadBytes += delta.shuffleReadMetrics.localBytesRead +
+      delta.shuffleReadMetrics.remoteBytesRead
+    shuffleReadRecords += delta.shuffleReadMetrics.recordsRead
+    shuffleWriteBytes += delta.shuffleWriteMetrics.bytesWritten
+    shuffleWriteRecords += delta.shuffleWriteMetrics.recordsWritten
+    memoryBytesSpilled += delta.memoryBytesSpilled
+    diskBytesSpilled += delta.diskBytesSpilled
+  }
+
+}
+
+private class LiveExecutorStageSummary(
+    stageId: Int,
+    attemptId: Int,
+    executorId: String) extends LiveEntity {
+
+  var taskTime = 0L
+  var succeededTasks = 0
+  var failedTasks = 0
+  var killedTasks = 0
+
+  val metrics = new MetricsTracker()
+
+  override protected def doUpdate(): Any = {
+    val info = new v1.ExecutorStageSummary(
+      taskTime,
+      failedTasks,
+      succeededTasks,
+      metrics.inputBytes,
+      metrics.outputBytes,
+      metrics.shuffleReadBytes,
+      metrics.shuffleWriteBytes,
+      metrics.memoryBytesSpilled,
+      metrics.diskBytesSpilled)
+    new ExecutorStageSummaryWrapper(stageId, attemptId, executorId, info)
+  }
+
+}
+
+private class LiveStage extends LiveEntity {
+
+  import LiveEntityHelpers._
+
+  var jobs = Seq[LiveJob]()
+  var jobIds = Set[Int]()
+
+  var info: StageInfo = null
+  var status = v1.StageStatus.PENDING
+
+  var schedulingPool: String = SparkUI.DEFAULT_POOL_NAME
+
+  var activeTasks = 0
+  var completedTasks = 0
+  var failedTasks = 0
+
+  var firstLaunchTime = Long.MaxValue
+
+  val metrics = new MetricsTracker()
+
+  val executorSummaries = new HashMap[String, LiveExecutorStageSummary]()
+
+  def executorSummary(executorId: String): LiveExecutorStageSummary = {
+    executorSummaries.getOrElseUpdate(executorId,
+      new LiveExecutorStageSummary(info.stageId, info.attemptId, executorId))
+  }
+
+  override protected def doUpdate(): Any = {
+    val update = new v1.StageData(
+      status,
+      info.stageId,
+      info.attemptId,
+
+      activeTasks,
+      completedTasks,
+      failedTasks,
+
+      metrics.executorRunTime,
+      metrics.executorCpuTime,
+      info.submissionTime.map(new Date(_)),
+      if (firstLaunchTime < Long.MaxValue) Some(new Date(firstLaunchTime)) else None,
+      info.completionTime.map(new Date(_)),
+
+      metrics.inputBytes,
+      metrics.inputRecords,
+      metrics.outputBytes,
+      metrics.outputRecords,
+      metrics.shuffleReadBytes,
+      metrics.shuffleReadRecords,
+      metrics.shuffleWriteBytes,
+      metrics.shuffleWriteRecords,
+      metrics.memoryBytesSpilled,
+      metrics.diskBytesSpilled,
+
+      info.name,
+      info.details,
+      schedulingPool,
+
+      newAccumulatorInfos(info.accumulables.values),
+      None,
+      None)
+
+    new StageDataWrapper(update, jobIds)
+  }
+
+}
+
+private class LiveRDDPartition(val blockName: String) {
+
+  var executors = Set[String]()
+  var storageLevel: String = null
+  var memoryUsed = 0L
+  var diskUsed = 0L
+
+  def toApi(): v1.RDDPartitionInfo = {
+    new v1.RDDPartitionInfo(
+      blockName,
+      storageLevel,
+      memoryUsed,
+      diskUsed,
+      executors.toSeq.sorted)
+  }
+
+}
+
+private class LiveRDDDistribution(val exec: LiveExecutor) {
+
+  var memoryRemaining = exec.maxMemory
+  var memoryUsed = 0L
+  var diskUsed = 0L
+
+  var onHeapUsed = 0L
+  var offHeapUsed = 0L
+  var onHeapRemaining = 0L
+  var offHeapRemaining = 0L
+
+  def toApi(): v1.RDDDataDistribution = {
+    new v1.RDDDataDistribution(
+      exec.hostPort,
+      memoryUsed,
+      memoryRemaining,
+      diskUsed,
+      if (exec.hasMemoryInfo) Some(onHeapUsed) else None,
+      if (exec.hasMemoryInfo) Some(offHeapUsed) else None,
+      if (exec.hasMemoryInfo) Some(onHeapRemaining) else None,
+      if (exec.hasMemoryInfo) Some(offHeapRemaining) else None)
+  }
+
+}
+
+private class LiveRDD(info: RDDInfo) extends LiveEntity {
+
+  var storageLevel: String = info.storageLevel.description
+  var memoryUsed = 0L
+  var diskUsed = 0L
+
+  private val partitions = new HashMap[String, LiveRDDPartition]()
+  private val distributions = new HashMap[String, LiveRDDDistribution]()
+
+  def partition(blockName: String): LiveRDDPartition = {
+    partitions.getOrElseUpdate(blockName, new LiveRDDPartition(blockName))
+  }
+
+  def removePartition(blockName: String): Unit = partitions.remove(blockName)
+
+  def distribution(exec: LiveExecutor): LiveRDDDistribution = {
+    distributions.getOrElseUpdate(exec.hostPort, new LiveRDDDistribution(exec))
+  }
+
+  def removeDistribution(exec: LiveExecutor): Unit = {
+    distributions.remove(exec.hostPort)
+  }
+
+  override protected def doUpdate(): Any = {
+    val parts = if (partitions.nonEmpty) {
+      Some(partitions.values.toList.sortBy(_.blockName).map(_.toApi()))
+    } else {
+      None
+    }
+
+    val dists = if (distributions.nonEmpty) {
+      Some(distributions.values.toList.sortBy(_.exec.executorId).map(_.toApi()))
+    } else {
+      None
+    }
+
+    val rdd = new v1.RDDStorageInfo(
+      info.id,
+      info.name,
+      info.numPartitions,
+      partitions.size,
+      storageLevel,
+      memoryUsed,
+      diskUsed,
+      dists,
+      parts)
+
+    new RDDStorageInfoWrapper(rdd)
+  }
+
+}
+
+private object LiveEntityHelpers {
+
+  def newAccumulatorInfos(accums: Iterable[AccumulableInfo]): Seq[v1.AccumulableInfo] = {
+    accums
+      .filter { acc =>
+        // We don't need to store internal or SQL accumulables as their values will be shown in
+        // other places, so drop them to reduce the memory usage.
+        !acc.internal && (!acc.metadata.isDefined ||
+          acc.metadata.get != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
+      }
+      .map { acc =>
+        new v1.AccumulableInfo(
+          acc.id,
+          acc.name.map(_.intern()).orNull,
+          acc.update.map(_.toString()),
+          acc.value.map(_.toString()).orNull)
+      }
+      .toSeq
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
index 4a4ed95..5f69949 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala
@@ -71,7 +71,7 @@ private[v1] object AllStagesResource {
 
     val taskData = if (includeDetails) {
       Some(stageUiData.taskData.map { case (k, v) =>
-        k -> convertTaskData(v, stageUiData.lastUpdateTime) })
+        k -> convertTaskData(v, stageUiData.lastUpdateTime) }.toMap)
     } else {
       None
     }
@@ -88,7 +88,7 @@ private[v1] object AllStagesResource {
           memoryBytesSpilled = summary.memoryBytesSpilled,
           diskBytesSpilled = summary.diskBytesSpilled
         )
-      })
+      }.toMap)
     } else {
       None
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 31659b2..bff6f90 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -16,11 +16,11 @@
  */
 package org.apache.spark.status.api.v1
 
+import java.lang.{Long => JLong}
 import java.util.Date
 
-import scala.collection.Map
-
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
 
 import org.apache.spark.JobExecutionStatus
 
@@ -129,9 +129,13 @@ class RDDDataDistribution private[spark](
     val memoryUsed: Long,
     val memoryRemaining: Long,
     val diskUsed: Long,
+    @JsonDeserialize(contentAs = classOf[JLong])
     val onHeapMemoryUsed: Option[Long],
+    @JsonDeserialize(contentAs = classOf[JLong])
     val offHeapMemoryUsed: Option[Long],
+    @JsonDeserialize(contentAs = classOf[JLong])
     val onHeapMemoryRemaining: Option[Long],
+    @JsonDeserialize(contentAs = classOf[JLong])
     val offHeapMemoryRemaining: Option[Long])
 
 class RDDPartitionInfo private[spark](
@@ -179,7 +183,8 @@ class TaskData private[spark](
     val index: Int,
     val attempt: Int,
     val launchTime: Date,
-    val duration: Option[Long] = None,
+    @JsonDeserialize(contentAs = classOf[JLong])
+    val duration: Option[Long],
     val executorId: String,
     val host: String,
     val status: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
new file mode 100644
index 0000000..9579acc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import com.fasterxml.jackson.annotation.JsonIgnore
+
+import org.apache.spark.status.KVUtils._
+import org.apache.spark.status.api.v1._
+import org.apache.spark.util.kvstore.KVIndex
+
+private[spark] class ApplicationInfoWrapper(val info: ApplicationInfo) {
+
+  @JsonIgnore @KVIndex
+  def id: String = info.id
+
+}
+
+private[spark] class ExecutorSummaryWrapper(val info: ExecutorSummary) {
+
+  @JsonIgnore @KVIndex
+  private[this] val id: String = info.id
+
+  @JsonIgnore @KVIndex("active")
+  private[this] val active: Boolean = info.isActive
+
+  @JsonIgnore @KVIndex("host")
+  val host: String = info.hostPort.split(":")(0)
+
+}
+
+/**
+ * Keep track of the existing stages when the job was submitted, and those that were
+ * completed during the job's execution. This allows a more accurate acounting of how
+ * many tasks were skipped for the job.
+ */
+private[spark] class JobDataWrapper(
+    val info: JobData,
+    val skippedStages: Set[Int]) {
+
+  @JsonIgnore @KVIndex
+  private[this] val id: Int = info.jobId
+
+}
+
+private[spark] class StageDataWrapper(
+    val info: StageData,
+    val jobIds: Set[Int]) {
+
+  @JsonIgnore @KVIndex
+  def id: Array[Int] = Array(info.stageId, info.attemptId)
+
+}
+
+private[spark] class TaskDataWrapper(val info: TaskData) {
+
+  @JsonIgnore @KVIndex
+  def id: Long = info.taskId
+
+}
+
+private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) {
+
+  @JsonIgnore @KVIndex
+  def id: Int = info.id
+
+  @JsonIgnore @KVIndex("cached")
+  def cached: Boolean = info.numCachedPartitions > 0
+
+}
+
+private[spark] class ExecutorStageSummaryWrapper(
+    val stageId: Int,
+    val stageAttemptId: Int,
+    val executorId: String,
+    val info: ExecutorStageSummary) {
+
+  @JsonIgnore @KVIndex
+  val id: Array[Any] = Array(stageId, stageAttemptId, executorId)
+
+  @JsonIgnore @KVIndex("stage")
+  private[this] val stage: Array[Int] = Array(stageId, stageAttemptId)
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 2141934..03bd3ea 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -611,7 +611,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
 
     // Manually overwrite the version in the listing db; this should cause the new provider to
     // discard all data because the versions don't match.
-    val meta = new KVStoreMetadata(FsHistoryProvider.CURRENT_LISTING_VERSION + 1,
+    val meta = new FsHistoryProviderMetadata(FsHistoryProvider.CURRENT_LISTING_VERSION + 1,
       conf.get(LOCAL_STORE_DIR).get)
     oldProvider.listing.setMetadata(meta)
     oldProvider.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
new file mode 100644
index 0000000..6f7a0c1
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -0,0 +1,690 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import java.io.File
+import java.util.{Date, Properties}
+
+import scala.collection.JavaConverters._
+import scala.reflect.{classTag, ClassTag}
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster._
+import org.apache.spark.status.api.v1
+import org.apache.spark.storage._
+import org.apache.spark.util.Utils
+import org.apache.spark.util.kvstore._
+
+class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private var time: Long = _
+  private var testDir: File = _
+  private var store: KVStore = _
+
+  before {
+    time = 0L
+    testDir = Utils.createTempDir()
+    store = KVUtils.open(testDir, getClass().getName())
+  }
+
+  after {
+    store.close()
+    Utils.deleteRecursively(testDir)
+  }
+
+  test("scheduler events") {
+    val listener = new AppStatusListener(store)
+
+    // Start the application.
+    time += 1
+    listener.onApplicationStart(SparkListenerApplicationStart(
+      "name",
+      Some("id"),
+      time,
+      "user",
+      Some("attempt"),
+      None))
+
+    check[ApplicationInfoWrapper]("id") { app =>
+      assert(app.info.name === "name")
+      assert(app.info.id === "id")
+      assert(app.info.attempts.size === 1)
+
+      val attempt = app.info.attempts.head
+      assert(attempt.attemptId === Some("attempt"))
+      assert(attempt.startTime === new Date(time))
+      assert(attempt.lastUpdated === new Date(time))
+      assert(attempt.endTime.getTime() === -1L)
+      assert(attempt.sparkUser === "user")
+      assert(!attempt.completed)
+    }
+
+    // Start a couple of executors.
+    time += 1
+    val execIds = Array("1", "2")
+
+    execIds.foreach { id =>
+      listener.onExecutorAdded(SparkListenerExecutorAdded(time, id,
+        new ExecutorInfo(s"$id.example.com", 1, Map())))
+    }
+
+    execIds.foreach { id =>
+      check[ExecutorSummaryWrapper](id) { exec =>
+        assert(exec.info.id === id)
+        assert(exec.info.hostPort === s"$id.example.com")
+        assert(exec.info.isActive)
+      }
+    }
+
+    // Start a job with 2 stages / 4 tasks each
+    time += 1
+    val stages = Seq(
+      new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"),
+      new StageInfo(2, 0, "stage2", 4, Nil, Seq(1), "details2"))
+
+    val jobProps = new Properties()
+    jobProps.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "jobGroup")
+    jobProps.setProperty("spark.scheduler.pool", "schedPool")
+
+    listener.onJobStart(SparkListenerJobStart(1, time, stages, jobProps))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.jobId === 1)
+      assert(job.info.name === stages.last.name)
+      assert(job.info.description === None)
+      assert(job.info.status === JobExecutionStatus.RUNNING)
+      assert(job.info.submissionTime === Some(new Date(time)))
+      assert(job.info.jobGroup === Some("jobGroup"))
+    }
+
+    stages.foreach { info =>
+      check[StageDataWrapper](key(info)) { stage =>
+        assert(stage.info.status === v1.StageStatus.PENDING)
+        assert(stage.jobIds === Set(1))
+      }
+    }
+
+    // Submit stage 1
+    time += 1
+    stages.head.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stages.head, jobProps))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.numActiveStages === 1)
+    }
+
+    check[StageDataWrapper](key(stages.head)) { stage =>
+      assert(stage.info.status === v1.StageStatus.ACTIVE)
+      assert(stage.info.submissionTime === Some(new Date(stages.head.submissionTime.get)))
+      assert(stage.info.schedulingPool === "schedPool")
+    }
+
+    // Start tasks from stage 1
+    time += 1
+    var _taskIdTracker = -1L
+    def nextTaskId(): Long = {
+      _taskIdTracker += 1
+      _taskIdTracker
+    }
+
+    def createTasks(count: Int, time: Long): Seq[TaskInfo] = {
+      (1 to count).map { id =>
+        val exec = execIds(id.toInt % execIds.length)
+        val taskId = nextTaskId()
+        new TaskInfo(taskId, taskId.toInt, 1, time, exec, s"$exec.example.com",
+          TaskLocality.PROCESS_LOCAL, id % 2 == 0)
+      }
+    }
+
+    val s1Tasks = createTasks(4, time)
+    s1Tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, task))
+    }
+
+    assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size)
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.numActiveTasks === s1Tasks.size)
+    }
+
+    check[StageDataWrapper](key(stages.head)) { stage =>
+      assert(stage.info.numActiveTasks === s1Tasks.size)
+      assert(stage.info.firstTaskLaunchedTime === Some(new Date(s1Tasks.head.launchTime)))
+    }
+
+    s1Tasks.foreach { task =>
+      check[TaskDataWrapper](task.taskId) { wrapper =>
+        assert(wrapper.info.taskId === task.taskId)
+        assert(wrapper.info.index === task.index)
+        assert(wrapper.info.attempt === task.attemptNumber)
+        assert(wrapper.info.launchTime === new Date(task.launchTime))
+        assert(wrapper.info.executorId === task.executorId)
+        assert(wrapper.info.host === task.host)
+        assert(wrapper.info.status === task.status)
+        assert(wrapper.info.taskLocality === task.taskLocality.toString())
+        assert(wrapper.info.speculative === task.speculative)
+      }
+    }
+
+    // Send executor metrics update. Only update one metric to avoid a lot of boilerplate code.
+    s1Tasks.foreach { task =>
+      val accum = new AccumulableInfo(1L, Some(InternalAccumulator.MEMORY_BYTES_SPILLED),
+        Some(1L), None, true, false, None)
+      listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(
+        task.executorId,
+        Seq((task.taskId, stages.head.stageId, stages.head.attemptId, Seq(accum)))))
+    }
+
+    check[StageDataWrapper](key(stages.head)) { stage =>
+      assert(stage.info.memoryBytesSpilled === s1Tasks.size)
+    }
+
+    val execs = store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
+      .first(key(stages.head)).last(key(stages.head)).asScala.toSeq
+    assert(execs.size > 0)
+    execs.foreach { exec =>
+      assert(exec.info.memoryBytesSpilled === s1Tasks.size / 2)
+    }
+
+    // Fail one of the tasks, re-start it.
+    time += 1
+    s1Tasks.head.markFinished(TaskState.FAILED, time)
+    listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
+      "taskType", TaskResultLost, s1Tasks.head, null))
+
+    time += 1
+    val reattempt = {
+      val orig = s1Tasks.head
+      // Task reattempts have a different ID, but the same index as the original.
+      new TaskInfo(nextTaskId(), orig.index, orig.attemptNumber + 1, time, orig.executorId,
+        s"${orig.executorId}.example.com", TaskLocality.PROCESS_LOCAL, orig.speculative)
+    }
+    listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId,
+      reattempt))
+
+    assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size + 1)
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.numFailedTasks === 1)
+      assert(job.info.numActiveTasks === s1Tasks.size)
+    }
+
+    check[StageDataWrapper](key(stages.head)) { stage =>
+      assert(stage.info.numFailedTasks === 1)
+      assert(stage.info.numActiveTasks === s1Tasks.size)
+    }
+
+    check[TaskDataWrapper](s1Tasks.head.taskId) { task =>
+      assert(task.info.status === s1Tasks.head.status)
+      assert(task.info.duration === Some(s1Tasks.head.duration))
+      assert(task.info.errorMessage == Some(TaskResultLost.toErrorString))
+    }
+
+    check[TaskDataWrapper](reattempt.taskId) { task =>
+      assert(task.info.index === s1Tasks.head.index)
+      assert(task.info.attempt === reattempt.attemptNumber)
+    }
+
+    // Succeed all tasks in stage 1.
+    val pending = s1Tasks.drop(1) ++ Seq(reattempt)
+
+    val s1Metrics = TaskMetrics.empty
+    s1Metrics.setExecutorCpuTime(2L)
+    s1Metrics.setExecutorRunTime(4L)
+
+    time += 1
+    pending.foreach { task =>
+      task.markFinished(TaskState.FINISHED, time)
+      listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
+        "taskType", Success, task, s1Metrics))
+    }
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.numFailedTasks === 1)
+      assert(job.info.numActiveTasks === 0)
+      assert(job.info.numCompletedTasks === pending.size)
+    }
+
+    check[StageDataWrapper](key(stages.head)) { stage =>
+      assert(stage.info.numFailedTasks === 1)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === pending.size)
+    }
+
+    pending.foreach { task =>
+      check[TaskDataWrapper](task.taskId) { wrapper =>
+        assert(wrapper.info.errorMessage === None)
+        assert(wrapper.info.taskMetrics.get.executorCpuTime === 2L)
+        assert(wrapper.info.taskMetrics.get.executorRunTime === 4L)
+      }
+    }
+
+    assert(store.count(classOf[TaskDataWrapper]) === pending.size + 1)
+
+    // End stage 1.
+    time += 1
+    stages.head.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(stages.head))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.numActiveStages === 0)
+      assert(job.info.numCompletedStages === 1)
+    }
+
+    check[StageDataWrapper](key(stages.head)) { stage =>
+      assert(stage.info.status === v1.StageStatus.COMPLETE)
+      assert(stage.info.numFailedTasks === 1)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === pending.size)
+    }
+
+    // Submit stage 2.
+    time += 1
+    stages.last.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stages.last, jobProps))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.numActiveStages === 1)
+    }
+
+    check[StageDataWrapper](key(stages.last)) { stage =>
+      assert(stage.info.status === v1.StageStatus.ACTIVE)
+      assert(stage.info.submissionTime === Some(new Date(stages.last.submissionTime.get)))
+    }
+
+    // Start and fail all tasks of stage 2.
+    time += 1
+    val s2Tasks = createTasks(4, time)
+    s2Tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, stages.last.attemptId, task))
+    }
+
+    time += 1
+    s2Tasks.foreach { task =>
+      task.markFinished(TaskState.FAILED, time)
+      listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptId,
+        "taskType", TaskResultLost, task, null))
+    }
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.numFailedTasks === 1 + s2Tasks.size)
+      assert(job.info.numActiveTasks === 0)
+    }
+
+    check[StageDataWrapper](key(stages.last)) { stage =>
+      assert(stage.info.numFailedTasks === s2Tasks.size)
+      assert(stage.info.numActiveTasks === 0)
+    }
+
+    // Fail stage 2.
+    time += 1
+    stages.last.completionTime = Some(time)
+    stages.last.failureReason = Some("uh oh")
+    listener.onStageCompleted(SparkListenerStageCompleted(stages.last))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.numCompletedStages === 1)
+      assert(job.info.numFailedStages === 1)
+    }
+
+    check[StageDataWrapper](key(stages.last)) { stage =>
+      assert(stage.info.status === v1.StageStatus.FAILED)
+      assert(stage.info.numFailedTasks === s2Tasks.size)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === 0)
+    }
+
+    // - Re-submit stage 2, all tasks, and succeed them and the stage.
+    val oldS2 = stages.last
+    val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptId + 1, oldS2.name, oldS2.numTasks,
+      oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics)
+
+    time += 1
+    newS2.submissionTime = Some(time)
+    listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, jobProps))
+    assert(store.count(classOf[StageDataWrapper]) === 3)
+
+    val newS2Tasks = createTasks(4, time)
+
+    newS2Tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptId, task))
+    }
+
+    time += 1
+    newS2Tasks.foreach { task =>
+      task.markFinished(TaskState.FINISHED, time)
+      listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptId, "taskType", Success,
+        task, null))
+    }
+
+    time += 1
+    newS2.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(newS2))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.numActiveStages === 0)
+      assert(job.info.numFailedStages === 1)
+      assert(job.info.numCompletedStages === 2)
+    }
+
+    check[StageDataWrapper](key(newS2)) { stage =>
+      assert(stage.info.status === v1.StageStatus.COMPLETE)
+      assert(stage.info.numActiveTasks === 0)
+      assert(stage.info.numCompleteTasks === newS2Tasks.size)
+    }
+
+    // End job.
+    time += 1
+    listener.onJobEnd(SparkListenerJobEnd(1, time, JobSucceeded))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.status === JobExecutionStatus.SUCCEEDED)
+    }
+
+    // Submit a second job that re-uses stage 1 and stage 2. Stage 1 won't be re-run, but
+    // stage 2 will. In any case, the DAGScheduler creates new info structures that are copies
+    // of the old stages, so mimic that behavior here. The "new" stage 1 is submitted without
+    // a submission time, which means it is "skipped", and the stage 2 re-execution should not
+    // change the stats of the already finished job.
+    time += 1
+    val j2Stages = Seq(
+      new StageInfo(3, 0, "stage1", 4, Nil, Nil, "details1"),
+      new StageInfo(4, 0, "stage2", 4, Nil, Seq(3), "details2"))
+    j2Stages.last.submissionTime = Some(time)
+    listener.onJobStart(SparkListenerJobStart(2, time, j2Stages, null))
+    assert(store.count(classOf[JobDataWrapper]) === 2)
+
+    listener.onStageSubmitted(SparkListenerStageSubmitted(j2Stages.head, jobProps))
+    listener.onStageCompleted(SparkListenerStageCompleted(j2Stages.head))
+    listener.onStageSubmitted(SparkListenerStageSubmitted(j2Stages.last, jobProps))
+    assert(store.count(classOf[StageDataWrapper]) === 5)
+
+    time += 1
+    val j2s2Tasks = createTasks(4, time)
+
+    j2s2Tasks.foreach { task =>
+      listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId, j2Stages.last.attemptId,
+        task))
+    }
+
+    time += 1
+    j2s2Tasks.foreach { task =>
+      task.markFinished(TaskState.FINISHED, time)
+      listener.onTaskEnd(SparkListenerTaskEnd(j2Stages.last.stageId, j2Stages.last.attemptId,
+        "taskType", Success, task, null))
+    }
+
+    time += 1
+    j2Stages.last.completionTime = Some(time)
+    listener.onStageCompleted(SparkListenerStageCompleted(j2Stages.last))
+
+    time += 1
+    listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded))
+
+    check[JobDataWrapper](1) { job =>
+      assert(job.info.numCompletedStages === 2)
+      assert(job.info.numCompletedTasks === s1Tasks.size + s2Tasks.size)
+    }
+
+    check[JobDataWrapper](2) { job =>
+      assert(job.info.status === JobExecutionStatus.SUCCEEDED)
+      assert(job.info.numCompletedStages === 1)
+      assert(job.info.numCompletedTasks === j2s2Tasks.size)
+      assert(job.info.numSkippedStages === 1)
+      assert(job.info.numSkippedTasks === s1Tasks.size)
+    }
+
+    // Blacklist an executor.
+    time += 1
+    listener.onExecutorBlacklisted(SparkListenerExecutorBlacklisted(time, "1", 42))
+    check[ExecutorSummaryWrapper]("1") { exec =>
+      assert(exec.info.isBlacklisted)
+    }
+
+    time += 1
+    listener.onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted(time, "1"))
+    check[ExecutorSummaryWrapper]("1") { exec =>
+      assert(!exec.info.isBlacklisted)
+    }
+
+    // Blacklist a node.
+    time += 1
+    listener.onNodeBlacklisted(SparkListenerNodeBlacklisted(time, "1.example.com", 2))
+    check[ExecutorSummaryWrapper]("1") { exec =>
+      assert(exec.info.isBlacklisted)
+    }
+
+    time += 1
+    listener.onNodeUnblacklisted(SparkListenerNodeUnblacklisted(time, "1.example.com"))
+    check[ExecutorSummaryWrapper]("1") { exec =>
+      assert(!exec.info.isBlacklisted)
+    }
+
+    // Stop executors.
+    listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "1", "Test"))
+    listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "2", "Test"))
+
+    Seq("1", "2").foreach { id =>
+      check[ExecutorSummaryWrapper](id) { exec =>
+        assert(exec.info.id === id)
+        assert(!exec.info.isActive)
+      }
+    }
+
+    // End the application.
+    listener.onApplicationEnd(SparkListenerApplicationEnd(42L))
+
+    check[ApplicationInfoWrapper]("id") { app =>
+      assert(app.info.name === "name")
+      assert(app.info.id === "id")
+      assert(app.info.attempts.size === 1)
+
+      val attempt = app.info.attempts.head
+      assert(attempt.attemptId === Some("attempt"))
+      assert(attempt.startTime === new Date(1L))
+      assert(attempt.lastUpdated === new Date(42L))
+      assert(attempt.endTime === new Date(42L))
+      assert(attempt.duration === 41L)
+      assert(attempt.sparkUser === "user")
+      assert(attempt.completed)
+    }
+  }
+
+  test("storage events") {
+    val listener = new AppStatusListener(store)
+    val maxMemory = 42L
+
+    // Register a couple of block managers.
+    val bm1 = BlockManagerId("1", "1.example.com", 42)
+    val bm2 = BlockManagerId("2", "2.example.com", 84)
+    Seq(bm1, bm2).foreach { bm =>
+      listener.onExecutorAdded(SparkListenerExecutorAdded(1L, bm.executorId,
+        new ExecutorInfo(bm.host, 1, Map())))
+      listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm, maxMemory))
+      check[ExecutorSummaryWrapper](bm.executorId) { exec =>
+        assert(exec.info.maxMemory === maxMemory)
+      }
+    }
+
+    val rdd1b1 = RDDBlockId(1, 1)
+    val level = StorageLevel.MEMORY_AND_DISK
+
+    // Submit a stage and make sure the RDD is recorded.
+    val rddInfo = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil)
+    val stage = new StageInfo(1, 0, "stage1", 4, Seq(rddInfo), Nil, "details1")
+    listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties()))
+
+    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+      assert(wrapper.info.name === rddInfo.name)
+      assert(wrapper.info.numPartitions === rddInfo.numPartitions)
+      assert(wrapper.info.storageLevel === rddInfo.storageLevel.description)
+    }
+
+    // Add partition 1 replicated on two block managers.
+    listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b1, level, 1L, 1L)))
+
+    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+      assert(wrapper.info.memoryUsed === 1L)
+      assert(wrapper.info.diskUsed === 1L)
+
+      assert(wrapper.info.dataDistribution.isDefined)
+      assert(wrapper.info.dataDistribution.get.size === 1)
+
+      val dist = wrapper.info.dataDistribution.get.head
+      assert(dist.address === bm1.hostPort)
+      assert(dist.memoryUsed === 1L)
+      assert(dist.diskUsed === 1L)
+      assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
+
+      assert(wrapper.info.partitions.isDefined)
+      assert(wrapper.info.partitions.get.size === 1)
+
+      val part = wrapper.info.partitions.get.head
+      assert(part.blockName === rdd1b1.name)
+      assert(part.storageLevel === level.description)
+      assert(part.memoryUsed === 1L)
+      assert(part.diskUsed === 1L)
+      assert(part.executors === Seq(bm1.executorId))
+    }
+
+    check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
+      assert(exec.info.rddBlocks === 1L)
+      assert(exec.info.memoryUsed === 1L)
+      assert(exec.info.diskUsed === 1L)
+    }
+
+    listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm2, rdd1b1, level, 1L, 1L)))
+
+    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+      assert(wrapper.info.memoryUsed === 2L)
+      assert(wrapper.info.diskUsed === 2L)
+      assert(wrapper.info.dataDistribution.get.size === 2L)
+      assert(wrapper.info.partitions.get.size === 1L)
+
+      val dist = wrapper.info.dataDistribution.get.find(_.address == bm2.hostPort).get
+      assert(dist.memoryUsed === 1L)
+      assert(dist.diskUsed === 1L)
+      assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
+
+      val part = wrapper.info.partitions.get(0)
+      assert(part.memoryUsed === 2L)
+      assert(part.diskUsed === 2L)
+      assert(part.executors === Seq(bm1.executorId, bm2.executorId))
+    }
+
+    check[ExecutorSummaryWrapper](bm2.executorId) { exec =>
+      assert(exec.info.rddBlocks === 1L)
+      assert(exec.info.memoryUsed === 1L)
+      assert(exec.info.diskUsed === 1L)
+    }
+
+    // Add a second partition only to bm 1.
+    val rdd1b2 = RDDBlockId(1, 2)
+    listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b2, level,
+      3L, 3L)))
+
+    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+      assert(wrapper.info.memoryUsed === 5L)
+      assert(wrapper.info.diskUsed === 5L)
+      assert(wrapper.info.dataDistribution.get.size === 2L)
+      assert(wrapper.info.partitions.get.size === 2L)
+
+      val dist = wrapper.info.dataDistribution.get.find(_.address == bm1.hostPort).get
+      assert(dist.memoryUsed === 4L)
+      assert(dist.diskUsed === 4L)
+      assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
+
+      val part = wrapper.info.partitions.get.find(_.blockName === rdd1b2.name).get
+      assert(part.storageLevel === level.description)
+      assert(part.memoryUsed === 3L)
+      assert(part.diskUsed === 3L)
+      assert(part.executors === Seq(bm1.executorId))
+    }
+
+    check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
+      assert(exec.info.rddBlocks === 2L)
+      assert(exec.info.memoryUsed === 4L)
+      assert(exec.info.diskUsed === 4L)
+    }
+
+    // Remove block 1 from bm 1.
+    listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm1, rdd1b1,
+      StorageLevel.NONE, 1L, 1L)))
+
+    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+      assert(wrapper.info.memoryUsed === 4L)
+      assert(wrapper.info.diskUsed === 4L)
+      assert(wrapper.info.dataDistribution.get.size === 2L)
+      assert(wrapper.info.partitions.get.size === 2L)
+
+      val dist = wrapper.info.dataDistribution.get.find(_.address == bm1.hostPort).get
+      assert(dist.memoryUsed === 3L)
+      assert(dist.diskUsed === 3L)
+      assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
+
+      val part = wrapper.info.partitions.get.find(_.blockName === rdd1b1.name).get
+      assert(part.storageLevel === level.description)
+      assert(part.memoryUsed === 1L)
+      assert(part.diskUsed === 1L)
+      assert(part.executors === Seq(bm2.executorId))
+    }
+
+    check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
+      assert(exec.info.rddBlocks === 1L)
+      assert(exec.info.memoryUsed === 3L)
+      assert(exec.info.diskUsed === 3L)
+    }
+
+    // Remove block 2 from bm 2. This should leave only block 2 info in the store.
+    listener.onBlockUpdated(SparkListenerBlockUpdated(BlockUpdatedInfo(bm2, rdd1b1,
+      StorageLevel.NONE, 1L, 1L)))
+
+    check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
+      assert(wrapper.info.memoryUsed === 3L)
+      assert(wrapper.info.diskUsed === 3L)
+      assert(wrapper.info.dataDistribution.get.size === 1L)
+      assert(wrapper.info.partitions.get.size === 1L)
+      assert(wrapper.info.partitions.get(0).blockName === rdd1b2.name)
+    }
+
+    check[ExecutorSummaryWrapper](bm2.executorId) { exec =>
+      assert(exec.info.rddBlocks === 0L)
+      assert(exec.info.memoryUsed === 0L)
+      assert(exec.info.diskUsed === 0L)
+    }
+
+    // Unpersist RDD1.
+    listener.onUnpersistRDD(SparkListenerUnpersistRDD(rdd1b1.rddId))
+        intercept[NoSuchElementException] {
+      check[RDDStorageInfoWrapper](rdd1b1.rddId) { _ => () }
+    }
+
+  }
+
+  private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId)
+
+  private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {
+    val value = store.read(classTag[T].runtimeClass, key).asInstanceOf[T]
+    fn(value)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0e9a750a/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index dd299e0..45b8870 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,8 @@ object MimaExcludes {
 
   // Exclude rules for 2.3.x
   lazy val v23excludes = v22excludes ++ Seq(
+    // SPARK-18085: Better History Server scalability for many / large applications
+    ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.ExecutorSummary.executorLogs"),
     // [SPARK-20495][SQL] Add StorageLevel to cacheTable API
     ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable"),
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org