You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mc...@apache.org on 2018/09/07 17:42:53 UTC

[3/3] spark git commit: [SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API

[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API

Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory, on/off heap unified memory, direct memory, and mapped memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction.

## What changes were proposed in this pull request?

An ExecutorMetrics class is added, with jvmUsedHeapMemory, jvmUsedNonHeapMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory, onHeapUnifiedMemory, offHeapUnifiedMemory, directMemory and mappedMemory. The new ExecutorMetrics is sent by executors to the driver as part of the Heartbeat. A heartbeat is added for the driver as well, to collect these metrics for the driver.

The EventLoggingListener store information about the peak values for each metric, per active stage and executor. When a StageCompleted event is seen, a StageExecutorsMetrics event will be logged for each executor, with peak values for the stage.

The AppStatusListener records the peak values for each memory metric.

The new memory metrics are added to the executors REST API.

## How was this patch tested?

New unit tests have been added. This was also tested on our cluster.

Author: Edwina Lu <ed...@linkedin.com>
Author: Imran Rashid <ir...@cloudera.com>
Author: edwinalu <ed...@gmail.com>

Closes #21221 from edwinalu/SPARK-23429.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9241e1e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9241e1e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9241e1e7

Branch: refs/heads/master
Commit: 9241e1e7e66574cfafa68791771959dfc39c9684
Parents: 458f501
Author: Edwina Lu <ed...@linkedin.com>
Authored: Fri Sep 7 10:42:46 2018 -0700
Committer: mcheah <mc...@palantir.com>
Committed: Fri Sep 7 10:42:46 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkFirehoseListener.java |   6 +
 .../org/apache/spark/HeartbeatReceiver.scala    |   8 +-
 .../scala/org/apache/spark/Heartbeater.scala    |  71 +++++
 .../scala/org/apache/spark/SparkContext.scala   |  20 ++
 .../org/apache/spark/executor/Executor.scala    |  36 +--
 .../apache/spark/executor/ExecutorMetrics.scala |  81 +++++
 .../apache/spark/internal/config/package.scala  |   5 +
 .../org/apache/spark/memory/MemoryManager.scala |  28 ++
 .../spark/metrics/ExecutorMetricType.scala      | 104 ++++++
 .../apache/spark/scheduler/DAGScheduler.scala   |   9 +-
 .../spark/scheduler/EventLoggingListener.scala  |  54 +++-
 .../apache/spark/scheduler/SparkListener.scala  |  32 +-
 .../spark/scheduler/SparkListenerBus.scala      |   2 +
 .../apache/spark/scheduler/TaskScheduler.scala  |  10 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |  13 +-
 .../apache/spark/status/AppStatusListener.scala |  44 ++-
 .../org/apache/spark/status/LiveEntity.scala    |   9 +-
 .../org/apache/spark/status/api/v1/api.scala    |  39 ++-
 .../org/apache/spark/util/JsonProtocol.scala    |  47 ++-
 .../application_list_json_expectation.json      |  15 +
 .../completed_app_list_json_expectation.json    |  15 +
 ..._with_executor_metrics_json_expectation.json | 314 +++++++++++++++++++
 .../limit_app_list_json_expectation.json        |  30 +-
 .../minDate_app_list_json_expectation.json      |  15 +
 .../minEndDate_app_list_json_expectation.json   |  17 +-
 .../application_1506645932520_24630151          |  63 ++++
 .../apache/spark/HeartbeatReceiverSuite.scala   |  11 +-
 .../deploy/history/HistoryServerSuite.scala     |   3 +
 .../spark/scheduler/DAGSchedulerSuite.scala     |   7 +-
 .../scheduler/EventLoggingListenerSuite.scala   | 221 ++++++++++++-
 .../scheduler/ExternalClusterManagerSuite.scala |   4 +-
 .../spark/scheduler/ReplayListenerSuite.scala   |   5 +-
 .../spark/status/AppStatusListenerSuite.scala   | 162 +++++++++-
 .../apache/spark/util/JsonProtocolSuite.scala   | 107 ++++++-
 dev/.rat-excludes                               |   1 +
 project/MimaExcludes.scala                      |   6 +
 36 files changed, 1531 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 94c5c11..731f6fc 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -104,6 +104,12 @@ public class SparkFirehoseListener implements SparkListenerInterface {
   }
 
   @Override
+  public final void onStageExecutorMetrics(
+      SparkListenerStageExecutorMetrics executorMetrics) {
+    onEvent(executorMetrics);
+  }
+
+  @Override
   public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
     onEvent(executorAdded);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index bcbc8df..ab0ae55 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit}
 import scala.collection.mutable
 import scala.concurrent.Future
 
+import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.scheduler._
@@ -37,7 +38,8 @@ import org.apache.spark.util._
 private[spark] case class Heartbeat(
     executorId: String,
     accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
-    blockManagerId: BlockManagerId)
+    blockManagerId: BlockManagerId,
+    executorUpdates: ExecutorMetrics) // executor level updates
 
 /**
  * An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
@@ -119,14 +121,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
       context.reply(true)
 
     // Messages received from executors
-    case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
+    case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorMetrics) =>
       if (scheduler != null) {
         if (executorLastSeen.contains(executorId)) {
           executorLastSeen(executorId) = clock.getTimeMillis()
           eventLoopThread.submit(new Runnable {
             override def run(): Unit = Utils.tryLogNonFatalError {
               val unknownExecutor = !scheduler.executorHeartbeatReceived(
-                executorId, accumUpdates, blockManagerId)
+                executorId, accumUpdates, blockManagerId, executorMetrics)
               val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
               context.reply(response)
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/Heartbeater.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Heartbeater.scala b/core/src/main/scala/org/apache/spark/Heartbeater.scala
new file mode 100644
index 0000000..5ba1b9b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/Heartbeater.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryManager
+import org.apache.spark.metrics.ExecutorMetricType
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Creates a heartbeat thread which will call the specified reportHeartbeat function at
+ * intervals of intervalMs.
+ *
+ * @param memoryManager the memory manager for execution and storage memory.
+ * @param reportHeartbeat the heartbeat reporting function to call.
+ * @param name the thread name for the heartbeater.
+ * @param intervalMs the interval between heartbeats.
+ */
+private[spark] class Heartbeater(
+    memoryManager: MemoryManager,
+    reportHeartbeat: () => Unit,
+    name: String,
+    intervalMs: Long) extends Logging {
+  // Executor for the heartbeat task
+  private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name)
+
+  /** Schedules a task to report a heartbeat. */
+  def start(): Unit = {
+    // Wait a random interval so the heartbeats don't end up in sync
+    val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
+
+    val heartbeatTask = new Runnable() {
+      override def run(): Unit = Utils.logUncaughtExceptions(reportHeartbeat())
+    }
+    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
+  }
+
+  /** Stops the heartbeat thread. */
+  def stop(): Unit = {
+    heartbeater.shutdown()
+    heartbeater.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  /**
+   * Get the current executor level metrics. These are returned as an array, with the index
+   * determined by MetricGetter.values
+   */
+  def getCurrentMetrics(): ExecutorMetrics = {
+    val metrics = ExecutorMetricType.values.map(_.getMetricValue(memoryManager)).toArray
+    new ExecutorMetrics(metrics)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e5b1e0e..d943087 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -213,6 +213,7 @@ class SparkContext(config: SparkConf) extends Logging {
   private var _files: Seq[String] = _
   private var _shutdownHookRef: AnyRef = _
   private var _statusStore: AppStatusStore = _
+  private var _heartbeater: Heartbeater = _
 
   /* ------------------------------------------------------------------------------------- *
    | Accessors and public fields. These provide access to the internal state of the        |
@@ -496,6 +497,11 @@ class SparkContext(config: SparkConf) extends Logging {
     _dagScheduler = new DAGScheduler(this)
     _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
 
+    // create and start the heartbeater for collecting memory metrics
+    _heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater",
+      conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
+    _heartbeater.start()
+
     // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
     // constructor
     _taskScheduler.start()
@@ -1959,6 +1965,12 @@ class SparkContext(config: SparkConf) extends Logging {
     Utils.tryLogNonFatalError {
       _eventLogger.foreach(_.stop())
     }
+    if (_heartbeater != null) {
+      Utils.tryLogNonFatalError {
+        _heartbeater.stop()
+      }
+      _heartbeater = null
+    }
     if (env != null && _heartbeatReceiver != null) {
       Utils.tryLogNonFatalError {
         env.rpcEnv.stop(_heartbeatReceiver)
@@ -2429,6 +2441,14 @@ class SparkContext(config: SparkConf) extends Logging {
     }
   }
 
+  /** Reports heartbeat metrics for the driver. */
+  private def reportHeartBeat(): Unit = {
+    val driverUpdates = _heartbeater.getCurrentMetrics()
+    val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
+    listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
+      Some(driverUpdates)))
+  }
+
   // In order to prevent multiple SparkContexts from being active at the same time, mark this
   // context as having finished construction.
   // NOTE: this must be placed at the end of the SparkContext constructor.

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 86b1957..072277c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
 import org.apache.spark.rpc.RpcTimeout
-import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
+import org.apache.spark.scheduler._
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
 import org.apache.spark.util._
@@ -148,7 +148,8 @@ private[spark] class Executor(
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
 
   // Executor for the heartbeat task.
-  private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
+  private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat,
+    "executor-heartbeater", conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
 
   // must be initialized before running startDriverHeartbeat()
   private val heartbeatReceiverRef =
@@ -167,7 +168,7 @@ private[spark] class Executor(
    */
   private var heartbeatFailures = 0
 
-  startDriverHeartbeater()
+  heartbeater.start()
 
   private[executor] def numRunningTasks: Int = runningTasks.size()
 
@@ -216,8 +217,12 @@ private[spark] class Executor(
 
   def stop(): Unit = {
     env.metricsSystem.report()
-    heartbeater.shutdown()
-    heartbeater.awaitTermination(10, TimeUnit.SECONDS)
+    try {
+      heartbeater.stop()
+    } catch {
+      case NonFatal(e) =>
+        logWarning("Unable to stop heartbeater", e)
+     }
     threadPool.shutdown()
     if (!isLocal) {
       env.stop()
@@ -787,6 +792,9 @@ private[spark] class Executor(
     val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
     val curGCTime = computeTotalGcTime()
 
+    // get executor level memory metrics
+    val executorUpdates = heartbeater.getCurrentMetrics()
+
     for (taskRunner <- runningTasks.values().asScala) {
       if (taskRunner.task != null) {
         taskRunner.task.metrics.mergeShuffleReadMetrics()
@@ -795,7 +803,8 @@ private[spark] class Executor(
       }
     }
 
-    val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
+    val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId,
+      executorUpdates)
     try {
       val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
           message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
@@ -815,21 +824,6 @@ private[spark] class Executor(
         }
     }
   }
-
-  /**
-   * Schedules a task to report heartbeat and partial metrics for active tasks to driver.
-   */
-  private def startDriverHeartbeater(): Unit = {
-    val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
-
-    // Wait a random interval so the heartbeats don't end up in sync
-    val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
-
-    val heartbeatTask = new Runnable() {
-      override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
-    }
-    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
-  }
 }
 
 private[spark] object Executor {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala
new file mode 100644
index 0000000..2933f3b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
+
+  // the first element is initialized to -1, indicating that the values for the array
+  // haven't been set yet.
+  metrics(0) = -1
+
+  /** Returns the value for the specified metricType. */
+  def getMetricValue(metricType: ExecutorMetricType): Long = {
+    metrics(ExecutorMetricType.metricIdxMap(metricType))
+  }
+
+  /** Returns true if the values for the metrics have been set, false otherwise. */
+  def isSet(): Boolean = metrics(0) > -1
+
+  private[spark] def this(metrics: Array[Long]) {
+    this()
+    Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size))
+  }
+
+  /**
+   * Constructor: create the ExecutorMetrics with the values specified.
+   *
+   * @param executorMetrics map of executor metric name to value
+   */
+  private[spark] def this(executorMetrics: Map[String, Long]) {
+    this()
+    (0 until ExecutorMetricType.values.length).foreach { idx =>
+      metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
+    }
+  }
+
+  /**
+   * Compare the specified executor metrics values with the current executor metric values,
+   * and update the value for any metrics where the new value for the metric is larger.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = {
+    var updated = false
+
+    (0 until ExecutorMetricType.values.length).foreach { idx =>
+       if (executorMetrics.metrics(idx) > metrics(idx)) {
+        updated = true
+        metrics(idx) = executorMetrics.metrics(idx)
+      }
+    }
+    updated
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 319e664..ee41bd1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -69,6 +69,11 @@ package object config {
     .bytesConf(ByteUnit.KiB)
     .createWithDefaultString("100k")
 
+  private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS =
+    ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val EVENT_LOG_OVERWRITE =
     ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 0641adc..4fde2d0 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -181,6 +181,34 @@ private[spark] abstract class MemoryManager(
   }
 
   /**
+   *  On heap execution memory currently in use, in bytes.
+   */
+  final def onHeapExecutionMemoryUsed: Long = synchronized {
+    onHeapExecutionMemoryPool.memoryUsed
+  }
+
+  /**
+   *  Off heap execution memory currently in use, in bytes.
+   */
+  final def offHeapExecutionMemoryUsed: Long = synchronized {
+    offHeapExecutionMemoryPool.memoryUsed
+  }
+
+  /**
+   *  On heap storage memory currently in use, in bytes.
+   */
+  final def onHeapStorageMemoryUsed: Long = synchronized {
+    onHeapStorageMemoryPool.memoryUsed
+  }
+
+  /**
+   *  Off heap storage memory currently in use, in bytes.
+   */
+  final def offHeapStorageMemoryUsed: Long = synchronized {
+    offHeapStorageMemoryPool.memoryUsed
+  }
+
+  /**
    * Returns the execution memory consumption, in bytes, for the given task.
    */
   private[memory] def getExecutionMemoryUsageForTask(taskAttemptId: Long): Long = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala
new file mode 100644
index 0000000..cd10dad
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
+  private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last
+}
+
+private[spark] abstract class MemoryManagerExecutorMetricType(
+    f: MemoryManager => Long) extends ExecutorMetricType {
+  override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
+    f(memoryManager)
+  }
+}
+
+private[spark] abstract class MBeanExecutorMetricType(mBeanName: String)
+  extends ExecutorMetricType {
+  private val bean = ManagementFactory.newPlatformMXBeanProxy(
+    ManagementFactory.getPlatformMBeanServer,
+    new ObjectName(mBeanName).toString, classOf[BufferPoolMXBean])
+
+  override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
+    bean.getMemoryUsed
+  }
+}
+
+case object JVMHeapMemory extends ExecutorMetricType {
+  override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
+    ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed()
+  }
+}
+
+case object JVMOffHeapMemory extends ExecutorMetricType {
+  override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
+    ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed()
+  }
+}
+
+case object OnHeapExecutionMemory extends MemoryManagerExecutorMetricType(
+  _.onHeapExecutionMemoryUsed)
+
+case object OffHeapExecutionMemory extends MemoryManagerExecutorMetricType(
+  _.offHeapExecutionMemoryUsed)
+
+case object OnHeapStorageMemory extends MemoryManagerExecutorMetricType(
+  _.onHeapStorageMemoryUsed)
+
+case object OffHeapStorageMemory extends MemoryManagerExecutorMetricType(
+  _.offHeapStorageMemoryUsed)
+
+case object OnHeapUnifiedMemory extends MemoryManagerExecutorMetricType(
+  (m => m.onHeapExecutionMemoryUsed + m.onHeapStorageMemoryUsed))
+
+case object OffHeapUnifiedMemory extends MemoryManagerExecutorMetricType(
+  (m => m.offHeapExecutionMemoryUsed + m.offHeapStorageMemoryUsed))
+
+case object DirectPoolMemory extends MBeanExecutorMetricType(
+  "java.nio:type=BufferPool,name=direct")
+
+case object MappedPoolMemory extends MBeanExecutorMetricType(
+  "java.nio:type=BufferPool,name=mapped")
+
+private[spark] object ExecutorMetricType {
+  // List of all executor metric types
+  val values = IndexedSeq(
+    JVMHeapMemory,
+    JVMOffHeapMemory,
+    OnHeapExecutionMemory,
+    OffHeapExecutionMemory,
+    OnHeapStorageMemory,
+    OffHeapStorageMemory,
+    OnHeapUnifiedMemory,
+    OffHeapUnifiedMemory,
+    DirectPoolMemory,
+    MappedPoolMemory
+  )
+
+  // Map of executor metric type to its index in values.
+  val metricIdxMap =
+    Map[ExecutorMetricType, Int](ExecutorMetricType.values.zipWithIndex: _*)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 50c91da..4710835 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -35,7 +35,7 @@ import org.apache.commons.lang3.SerializationUtils
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.network.util.JavaUtils
@@ -264,8 +264,11 @@ private[spark] class DAGScheduler(
       execId: String,
       // (taskId, stageId, stageAttemptId, accumUpdates)
       accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
-      blockManagerId: BlockManagerId): Boolean = {
-    listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
+      blockManagerId: BlockManagerId,
+      // executor metrics indexed by MetricGetter.values
+      executorUpdates: ExecutorMetrics): Boolean = {
+    listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
+      Some(executorUpdates)))
     blockManagerMaster.driverEndpoint.askSync[Boolean](
       BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 69bc51c..1629e17 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -23,8 +23,7 @@ import java.nio.charset.StandardCharsets
 import java.util.EnumSet
 import java.util.Locale
 
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, Map}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
@@ -36,6 +35,7 @@ import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.{SPARK_VERSION, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.io.CompressionCodec
@@ -51,6 +51,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
  *   spark.eventLog.overwrite - Whether to overwrite any existing files.
  *   spark.eventLog.dir - Path to the directory in which events are logged.
  *   spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
+ *   spark.eventLog.logStageExecutorMetrics.enabled - Whether to log stage executor metrics
  */
 private[spark] class EventLoggingListener(
     appId: String,
@@ -69,6 +70,7 @@ private[spark] class EventLoggingListener(
   private val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
   private val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
   private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES)
+  private val shouldLogStageExecutorMetrics = sparkConf.get(EVENT_LOG_STAGE_EXECUTOR_METRICS)
   private val testing = sparkConf.get(EVENT_LOG_TESTING)
   private val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
   private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
@@ -93,6 +95,9 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)
 
+  // map of (stageId, stageAttempt), to peak executor metrics for the stage
+  private val liveStageExecutorMetrics = Map.empty[(Int, Int), Map[String, ExecutorMetrics]]
+
   /**
    * Creates the log file in the configured log directory.
    */
@@ -155,7 +160,14 @@ private[spark] class EventLoggingListener(
   }
 
   // Events that do not trigger a flush
-  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event)
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
+    logEvent(event)
+    if (shouldLogStageExecutorMetrics) {
+      // record the peak metrics for the new stage
+      liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()),
+        Map.empty[String, ExecutorMetrics])
+    }
+  }
 
   override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event)
 
@@ -169,6 +181,26 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
+    if (shouldLogStageExecutorMetrics) {
+      // clear out any previous attempts, that did not have a stage completed event
+      val prevAttemptId = event.stageInfo.attemptNumber() - 1
+      for (attemptId <- 0 to prevAttemptId) {
+        liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId))
+      }
+
+      // log the peak executor metrics for the stage, for each live executor,
+      // whether or not the executor is running tasks for the stage
+      val executorOpt = liveStageExecutorMetrics.remove(
+        (event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+      executorOpt.foreach { execMap =>
+        execMap.foreach { case (executorId, peakExecutorMetrics) =>
+            logEvent(new SparkListenerStageExecutorMetrics(executorId, event.stageInfo.stageId,
+              event.stageInfo.attemptNumber(), peakExecutorMetrics))
+        }
+      }
+    }
+
+    // log stage completed event
     logEvent(event, flushLogger = true)
   }
 
@@ -234,8 +266,18 @@ private[spark] class EventLoggingListener(
     }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
+  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
+    if (shouldLogStageExecutorMetrics) {
+      // For the active stages, record any new peak values for the memory metrics for the executor
+      event.executorUpdates.foreach { executorUpdates =>
+        liveStageExecutorMetrics.values.foreach { peakExecutorMetrics =>
+          val peakMetrics = peakExecutorMetrics.getOrElseUpdate(
+            event.execId, new ExecutorMetrics())
+          peakMetrics.compareAndUpdatePeakValues(executorUpdates)
+        }
+      }
+    }
+  }
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
     if (event.logEvent) {
@@ -296,7 +338,7 @@ private[spark] object EventLoggingListener extends Logging {
   private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
 
   // A cache for compression codecs to avoid creating the same codec many times
-  private val codecMap = new mutable.HashMap[String, CompressionCodec]
+  private val codecMap = Map.empty[String, CompressionCodec]
 
   /**
    * Write metadata about an event log to the given stream.

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 8a112f6..293e836 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -26,7 +26,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo
 
 import org.apache.spark.{SparkConf, TaskEndReason}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
 import org.apache.spark.ui.SparkUI
@@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
     execId: String,
-    accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+    accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+    executorUpdates: Option[ExecutorMetrics] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+    execId: String,
+    stageId: Int,
+    stageAttemptId: Int,
+    executorMetrics: ExecutorMetrics)
   extends SparkListenerEvent
 
 @DeveloperApi
@@ -265,6 +283,13 @@ private[spark] trait SparkListenerInterface {
   def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit
 
   /**
+   * Called with the peak memory metrics for a given (executor, stage) combination. Note that this
+   * is only present when reading from the event log (as in the history server), and is never
+   * called in a live application.
+   */
+  def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit
+
+  /**
    * Called when the driver registers a new executor.
    */
   def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit
@@ -361,6 +386,9 @@ abstract class SparkListener extends SparkListenerInterface {
   override def onExecutorMetricsUpdate(
       executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }
 
+  override def onStageExecutorMetrics(
+      executorMetrics: SparkListenerStageExecutorMetrics): Unit = { }
+
   override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
 
   override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index ff19cc6..8f6b7ad 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -57,6 +57,8 @@ private[spark] trait SparkListenerBus
         listener.onApplicationEnd(applicationEnd)
       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
         listener.onExecutorMetricsUpdate(metricsUpdate)
+      case stageExecutorMetrics: SparkListenerStageExecutorMetrics =>
+        listener.onStageExecutorMetrics(stageExecutorMetrics)
       case executorAdded: SparkListenerExecutorAdded =>
         listener.onExecutorAdded(executorAdded)
       case executorRemoved: SparkListenerExecutorRemoved =>

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 95f7ae4..94221eb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.scheduler
 
+import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.AccumulatorV2
@@ -74,14 +75,15 @@ private[spark] trait TaskScheduler {
   def defaultParallelism(): Int
 
   /**
-   * Update metrics for in-progress tasks and let the master know that the BlockManager is still
-   * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
-   * indicating that the block manager should re-register.
+   * Update metrics for in-progress tasks and executor metrics, and let the master know that the
+   * BlockManager is still alive. Return true if the driver knows about the given block manager.
+   * Otherwise, return false, indicating that the block manager should re-register.
    */
   def executorHeartbeatReceived(
       execId: String,
       accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
-      blockManagerId: BlockManagerId): Boolean
+      blockManagerId: BlockManagerId,
+      executorUpdates: ExecutorMetrics): Boolean
 
   /**
    * Get an application ID associated with the job.

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8b71170..4f870e85 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -28,6 +28,7 @@ import scala.util.Random
 
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
+import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.rpc.RpcEndpoint
@@ -508,14 +509,15 @@ private[spark] class TaskSchedulerImpl(
   }
 
   /**
-   * Update metrics for in-progress tasks and let the master know that the BlockManager is still
-   * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
-   * indicating that the block manager should re-register.
+   * Update metrics for in-progress tasks and executor metrics, and let the master know that the
+   * BlockManager is still alive. Return true if the driver knows about the given block manager.
+   * Otherwise, return false, indicating that the block manager should re-register.
    */
   override def executorHeartbeatReceived(
       execId: String,
       accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
-      blockManagerId: BlockManagerId): Boolean = {
+      blockManagerId: BlockManagerId,
+      executorMetrics: ExecutorMetrics): Boolean = {
     // (taskId, stageId, stageAttemptId, accumUpdates)
     val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = {
       accumUpdates.flatMap { case (id, updates) =>
@@ -525,7 +527,8 @@ private[spark] class TaskSchedulerImpl(
         }
       }
     }
-    dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId)
+    dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId,
+      executorMetrics)
   }
 
   def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 91b75e4..304d092 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
 import org.apache.spark.status.api.v1
@@ -66,6 +66,7 @@ private[spark] class AppStatusListener(
   private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
   private val liveJobs = new HashMap[Int, LiveJob]()
   private val liveExecutors = new HashMap[String, LiveExecutor]()
+  private val deadExecutors = new HashMap[String, LiveExecutor]()
   private val liveTasks = new HashMap[Long, LiveTask]()
   private val liveRDDs = new HashMap[Int, LiveRDD]()
   private val pools = new HashMap[String, SchedulerPool]()
@@ -204,6 +205,19 @@ private[spark] class AppStatusListener(
           update(rdd, now)
         }
       }
+      if (isExecutorActiveForLiveStages(exec)) {
+        // the executor was running for a currently active stage, so save it for now in
+        // deadExecutors, and remove when there are no active stages overlapping with the
+        // executor.
+        deadExecutors.put(event.executorId, exec)
+      }
+    }
+  }
+
+  /** Was the specified executor active for any currently live stages? */
+  private def isExecutorActiveForLiveStages(exec: LiveExecutor): Boolean = {
+    liveStages.values.asScala.exists { stage =>
+      stage.info.submissionTime.getOrElse(0L) < exec.removeTime.getTime
     }
   }
 
@@ -641,6 +655,9 @@ private[spark] class AppStatusListener(
       }
     }
 
+    // remove any dead executors that were not running for any currently active stages
+    deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec))
+
     appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
     kvstore.write(appSummary)
   }
@@ -692,6 +709,31 @@ private[spark] class AppStatusListener(
         }
       }
     }
+
+    // check if there is a new peak value for any of the executor level memory metrics
+    // for the live UI. SparkListenerExecutorMetricsUpdate events are only processed
+    // for the live UI.
+    event.executorUpdates.foreach { updates =>
+      liveExecutors.get(event.execId).foreach { exec =>
+        if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+          maybeUpdate(exec, now)
+        }
+      }
+    }
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = {
+    val now = System.nanoTime()
+
+    // check if there is a new peak value for any of the executor level memory metrics,
+    // while reading from the log. SparkListenerStageExecutorMetrics are only processed
+    // when reading logs.
+    liveExecutors.get(executorMetrics.execId)
+      .orElse(deadExecutors.get(executorMetrics.execId)).map { exec =>
+      if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics)) {
+        update(exec, now)
+      }
+    }
   }
 
   override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 79e3f13..a0b2458 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.HashMap
 import com.google.common.collect.Interners
 
 import org.apache.spark.JobExecutionStatus
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage.RDDInfo
@@ -268,6 +268,9 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
 
   def hasMemoryInfo: Boolean = totalOnHeap >= 0L
 
+  // peak values for executor level metrics
+  val peakExecutorMetrics = new ExecutorMetrics()
+
   def hostname: String = if (host != null) host else hostPort.split(":")(0)
 
   override protected def doUpdate(): Any = {
@@ -302,10 +305,10 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
       Option(removeReason),
       executorLogs,
       memoryMetrics,
-      blacklistedInStages)
+      blacklistedInStages,
+      Some(peakExecutorMetrics).filter(_.isSet))
     new ExecutorSummaryWrapper(info)
   }
-
 }
 
 private class LiveExecutorStageSummary(

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 971d7e9..77466b6 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -22,9 +22,14 @@ import java.util.Date
 import scala.xml.{NodeSeq, Text}
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import com.fasterxml.jackson.core.{JsonGenerator, JsonParser}
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonSerializer, SerializerProvider}
+import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize}
 
 import org.apache.spark.JobExecutionStatus
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.metrics.ExecutorMetricType
 
 case class ApplicationInfo private[spark](
     id: String,
@@ -98,7 +103,10 @@ class ExecutorSummary private[spark](
     val removeReason: Option[String],
     val executorLogs: Map[String, String],
     val memoryMetrics: Option[MemoryMetrics],
-    val blacklistedInStages: Set[Int])
+    val blacklistedInStages: Set[Int],
+    @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+    @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+    val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
     val usedOnHeapStorageMemory: Long,
@@ -106,6 +114,33 @@ class MemoryMetrics private[spark](
     val totalOnHeapStorageMemory: Long,
     val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+    extends JsonDeserializer[Option[ExecutorMetrics]] {
+  override def deserialize(
+      jsonParser: JsonParser,
+      deserializationContext: DeserializationContext): Option[ExecutorMetrics] = {
+    val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+      new TypeReference[Option[Map[String, java.lang.Long]]] {})
+    metricsMap.map(metrics => new ExecutorMetrics(metrics))
+  }
+}
+/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */
+private[spark] class ExecutorMetricsJsonSerializer
+    extends JsonSerializer[Option[ExecutorMetrics]] {
+  override def serialize(
+      metrics: Option[ExecutorMetrics],
+      jsonGenerator: JsonGenerator,
+      serializerProvider: SerializerProvider): Unit = {
+    metrics.foreach { m: ExecutorMetrics =>
+      val metricsMap = ExecutorMetricType.values.map { metricType =>
+            metricType.name -> m.getMetricValue(metricType)
+      }.toMap
+      jsonGenerator.writeObject(metricsMap)
+    }
+  }
+}
+
 class JobData private[spark](
     val jobId: Int,
     val name: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 50c6461..0cd8612 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -31,6 +31,7 @@ import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark._
 import org.apache.spark.executor._
+import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.rdd.RDDOperationScope
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -98,6 +99,8 @@ private[spark] object JsonProtocol {
         logStartToJson(logStart)
       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
         executorMetricsUpdateToJson(metricsUpdate)
+      case stageExecutorMetrics: SparkListenerStageExecutorMetrics =>
+        stageExecutorMetricsToJson(stageExecutorMetrics)
       case blockUpdate: SparkListenerBlockUpdated =>
         blockUpdateToJson(blockUpdate)
       case _ => parse(mapper.writeValueAsString(event))
@@ -236,6 +239,7 @@ private[spark] object JsonProtocol {
   def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
     val execId = metricsUpdate.execId
     val accumUpdates = metricsUpdate.accumUpdates
+    val executorMetrics = metricsUpdate.executorUpdates.map(executorMetricsToJson(_))
     ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~
     ("Executor ID" -> execId) ~
     ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
@@ -243,7 +247,16 @@ private[spark] object JsonProtocol {
       ("Stage ID" -> stageId) ~
       ("Stage Attempt ID" -> stageAttemptId) ~
       ("Accumulator Updates" -> JArray(updates.map(accumulableInfoToJson).toList))
-    })
+    }) ~
+    ("Executor Metrics Updated" -> executorMetrics)
+  }
+
+  def stageExecutorMetricsToJson(metrics: SparkListenerStageExecutorMetrics): JValue = {
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageExecutorMetrics) ~
+    ("Executor ID" -> metrics.execId) ~
+    ("Stage ID" -> metrics.stageId) ~
+    ("Stage Attempt ID" -> metrics.stageAttemptId) ~
+    ("Executor Metrics" -> executorMetricsToJson(metrics.executorMetrics))
   }
 
   def blockUpdateToJson(blockUpdate: SparkListenerBlockUpdated): JValue = {
@@ -379,6 +392,14 @@ private[spark] object JsonProtocol {
     ("Updated Blocks" -> updatedBlocks)
   }
 
+  /** Convert executor metrics to JSON. */
+  def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = {
+    val metrics = ExecutorMetricType.values.map{ metricType =>
+      JField(metricType.name, executorMetrics.getMetricValue(metricType))
+     }
+    JObject(metrics: _*)
+  }
+
   def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
     val reason = Utils.getFormattedClassName(taskEndReason)
     val json: JObject = taskEndReason match {
@@ -531,6 +552,7 @@ private[spark] object JsonProtocol {
     val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
     val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
     val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
+    val stageExecutorMetrics = Utils.getFormattedClassName(SparkListenerStageExecutorMetrics)
     val blockUpdate = Utils.getFormattedClassName(SparkListenerBlockUpdated)
   }
 
@@ -555,6 +577,7 @@ private[spark] object JsonProtocol {
       case `executorRemoved` => executorRemovedFromJson(json)
       case `logStart` => logStartFromJson(json)
       case `metricsUpdate` => executorMetricsUpdateFromJson(json)
+      case `stageExecutorMetrics` => stageExecutorMetricsFromJson(json)
       case `blockUpdate` => blockUpdateFromJson(json)
       case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
         .asInstanceOf[SparkListenerEvent]
@@ -585,6 +608,15 @@ private[spark] object JsonProtocol {
     SparkListenerTaskGettingResult(taskInfo)
   }
 
+  /** Extract the executor metrics from JSON. */
+  def executorMetricsFromJson(json: JValue): ExecutorMetrics = {
+    val metrics =
+      ExecutorMetricType.values.map { metric =>
+        metric.name -> jsonOption(json \ metric.name).map(_.extract[Long]).getOrElse(0L)
+      }.toMap
+    new ExecutorMetrics(metrics)
+  }
+
   def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
     val stageId = (json \ "Stage ID").extract[Int]
     val stageAttemptId =
@@ -691,7 +723,18 @@ private[spark] object JsonProtocol {
         (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson)
       (taskId, stageId, stageAttemptId, updates)
     }
-    SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates)
+    val executorUpdates = jsonOption(json \ "Executor Metrics Updated").map {
+      executorUpdate => executorMetricsFromJson(executorUpdate)
+    }
+    SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates, executorUpdates)
+  }
+
+  def stageExecutorMetricsFromJson(json: JValue): SparkListenerStageExecutorMetrics = {
+    val execId = (json \ "Executor ID").extract[String]
+    val stageId = (json \ "Stage ID").extract[Int]
+    val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
+    val executorMetrics = executorMetricsFromJson(json \ "Executor Metrics")
+    SparkListenerStageExecutorMetrics(execId, stageId, stageAttemptId, executorMetrics)
   }
 
   def blockUpdateFromJson(json: JValue): SparkListenerBlockUpdated = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
index 4fecf84..eea6f59 100644
--- a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1506645932520_24630151",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2018-04-19T23:54:42.734GMT",
+    "endTime" : "2018-04-19T23:56:29.134GMT",
+    "lastUpdated" : "",
+    "duration" : 106400,
+    "sparkUser" : "edlu",
+    "completed" : true,
+    "appSparkVersion" : "2.4.0-SNAPSHOT",
+    "lastUpdatedEpoch" : 0,
+    "startTimeEpoch" : 1524182082734,
+    "endTimeEpoch" : 1524182189134
+  } ]
+}, {
   "id" : "application_1516285256255_0012",
   "name" : "Spark shell",
   "attempts" : [ {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
index 4fecf84..7bc7f31 100644
--- a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1506645932520_24630151",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2018-04-19T23:54:42.734GMT",
+    "endTime" : "2018-04-19T23:56:29.134GMT",
+    "lastUpdated" : "",
+    "duration" : 106400,
+    "sparkUser" : "edlu",
+    "completed" : true,
+    "appSparkVersion" : "2.4.0-SNAPSHOT",
+    "startTimeEpoch" : 1524182082734,
+    "endTimeEpoch" : 1524182189134,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "application_1516285256255_0012",
   "name" : "Spark shell",
   "attempts" : [ {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
new file mode 100644
index 0000000..9bf2086
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
@@ -0,0 +1,314 @@
+[ {
+  "id" : "driver",
+  "hostPort" : "node0033.grid.company.com:60749",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 0,
+  "maxTasks" : 0,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 0,
+  "totalTasks" : 0,
+  "totalDuration" : 0,
+  "totalGCTime" : 0,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : false,
+  "maxMemory" : 1043437977,
+  "addTime" : "2018-04-19T23:55:05.107GMT",
+  "executorLogs" : { },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 1043437977,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ],
+  "peakMemoryMetrics" : {
+    "OnHeapStorageMemory" : 905801,
+    "JVMOffHeapMemory" : 205304696,
+    "OffHeapExecutionMemory" : 0,
+    "OnHeapUnifiedMemory" : 905801,
+    "OnHeapExecutionMemory" : 0,
+    "OffHeapUnifiedMemory" : 0,
+    "DirectPoolMemory" : 397602,
+    "MappedPoolMemory" : 0,
+    "JVMHeapMemory" : 629553808,
+    "OffHeapStorageMemory" : 0
+  }
+}, {
+  "id" : "7",
+  "hostPort" : "node6340.grid.company.com:5933",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 0,
+  "totalTasks" : 0,
+  "totalDuration" : 0,
+  "totalGCTime" : 0,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:49.826GMT",
+  "executorLogs" : {
+    "stdout" : "http://node6340.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000009/edlu/stdout?start=-4096",
+    "stderr" : "http://node6340.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000009/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ]
+}, {
+  "id" : "6",
+  "hostPort" : "node6644.grid.company.com:8445",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 0,
+  "totalTasks" : 0,
+  "totalDuration" : 0,
+  "totalGCTime" : 0,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:47.549GMT",
+  "executorLogs" : {
+    "stdout" : "http://node6644.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000008/edlu/stdout?start=-4096",
+    "stderr" : "http://node6644.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000008/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ]
+}, {
+  "id" : "5",
+  "hostPort" : "node2477.grid.company.com:20123",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 1,
+  "totalTasks" : 1,
+  "totalDuration" : 9252,
+  "totalGCTime" : 920,
+  "totalInputBytes" : 36838295,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 355051,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:43.160GMT",
+  "executorLogs" : {
+    "stdout" : "http://node2477.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000007/edlu/stdout?start=-4096",
+    "stderr" : "http://node2477.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000007/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ]
+}, {
+  "id" : "4",
+  "hostPort" : "node4243.grid.company.com:16084",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 3,
+  "totalTasks" : 3,
+  "totalDuration" : 15645,
+  "totalGCTime" : 405,
+  "totalInputBytes" : 87272855,
+  "totalShuffleRead" : 438675,
+  "totalShuffleWrite" : 26773039,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:12.278GMT",
+  "executorLogs" : {
+    "stdout" : "http://node4243.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000006/edlu/stdout?start=-4096",
+    "stderr" : "http://node4243.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000006/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ],
+  "peakMemoryMetrics" : {
+    "OnHeapStorageMemory" : 63104457,
+    "JVMOffHeapMemory" : 95657456,
+    "OffHeapExecutionMemory" : 0,
+    "OnHeapUnifiedMemory" : 100853193,
+    "OnHeapExecutionMemory" : 37748736,
+    "OffHeapUnifiedMemory" : 0,
+    "DirectPoolMemory" : 126261,
+    "MappedPoolMemory" : 0,
+    "JVMHeapMemory" : 518613056,
+    "OffHeapStorageMemory" : 0
+  }
+}, {
+  "id" : "3",
+  "hostPort" : "node0998.grid.company.com:45265",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 1,
+  "totalTasks" : 1,
+  "totalDuration" : 14491,
+  "totalGCTime" : 342,
+  "totalInputBytes" : 50409514,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 31362123,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:12.088GMT",
+  "executorLogs" : {
+    "stdout" : "http://node0998.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000005/edlu/stdout?start=-4096",
+    "stderr" : "http://node0998.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000005/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ],
+  "peakMemoryMetrics" : {
+    "OnHeapStorageMemory" : 69535048,
+    "JVMOffHeapMemory" : 90709624,
+    "OffHeapExecutionMemory" : 0,
+    "OnHeapUnifiedMemory" : 69535048,
+    "OnHeapExecutionMemory" : 0,
+    "OffHeapUnifiedMemory" : 0,
+    "DirectPoolMemory" : 87796,
+    "MappedPoolMemory" : 0,
+    "JVMHeapMemory" : 726805712,
+    "OffHeapStorageMemory" : 0
+  }
+}, {
+  "id" : "2",
+  "hostPort" : "node4045.grid.company.com:29262",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 1,
+  "totalTasks" : 1,
+  "totalDuration" : 14113,
+  "totalGCTime" : 326,
+  "totalInputBytes" : 50423423,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 22950296,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:12.471GMT",
+  "executorLogs" : {
+    "stdout" : "http://node4045.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000004/edlu/stdout?start=-4096",
+    "stderr" : "http://node4045.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000004/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ],
+  "peakMemoryMetrics" : {
+    "OnHeapStorageMemory" : 58468944,
+    "JVMOffHeapMemory" : 91208368,
+    "OffHeapExecutionMemory" : 0,
+    "OnHeapUnifiedMemory" : 58468944,
+    "OnHeapExecutionMemory" : 0,
+    "OffHeapUnifiedMemory" : 0,
+    "DirectPoolMemory" : 87796,
+    "MappedPoolMemory" : 0,
+    "JVMHeapMemory" : 595946552,
+    "OffHeapStorageMemory" : 0
+  }
+}, {
+  "id" : "1",
+  "hostPort" : "node1404.grid.company.com:34043",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 3,
+  "totalTasks" : 3,
+  "totalDuration" : 15665,
+  "totalGCTime" : 471,
+  "totalInputBytes" : 98905018,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 20594744,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:11.695GMT",
+  "executorLogs" : {
+    "stdout" : "http://node1404.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000002/edlu/stdout?start=-4096",
+    "stderr" : "http://node1404.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000002/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ],
+  "peakMemoryMetrics" : {
+    "OnHeapStorageMemory" : 47962185,
+    "JVMOffHeapMemory" : 100519936,
+    "OffHeapExecutionMemory" : 0,
+    "OnHeapUnifiedMemory" : 47962185,
+    "OnHeapExecutionMemory" : 0,
+    "OffHeapUnifiedMemory" : 0,
+    "DirectPoolMemory" : 98230,
+    "MappedPoolMemory" : 0,
+    "JVMHeapMemory" : 755008624,
+    "OffHeapStorageMemory" : 0
+  }
+} ]

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
index 79950b0..9e1e65a 100644
--- a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1506645932520_24630151",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2018-04-19T23:54:42.734GMT",
+    "endTime" : "2018-04-19T23:56:29.134GMT",
+    "lastUpdated" : "",
+    "duration" : 106400,
+    "sparkUser" : "edlu",
+    "completed" : true,
+    "appSparkVersion" : "2.4.0-SNAPSHOT",
+    "lastUpdatedEpoch" : 0,
+    "startTimeEpoch" : 1524182082734,
+    "endTimeEpoch" : 1524182189134
+  } ]
+}, {
   "id" : "application_1516285256255_0012",
   "name" : "Spark shell",
   "attempts" : [ {
@@ -28,19 +43,4 @@
     "startTimeEpoch" : 1515492942372,
     "endTimeEpoch" : 1515493477606
   } ]
-}, {
-  "id" : "app-20161116163331-0000",
-  "name" : "Spark shell",
-  "attempts" : [ {
-    "startTime" : "2016-11-16T22:33:29.916GMT",
-    "endTime" : "2016-11-16T22:33:40.587GMT",
-    "lastUpdated" : "",
-    "duration" : 10671,
-    "sparkUser" : "jose",
-    "completed" : true,
-    "appSparkVersion" : "2.1.0-SNAPSHOT",
-    "lastUpdatedEpoch" : 0,
-    "startTimeEpoch" : 1479335609916,
-    "endTimeEpoch" : 1479335620587
-  } ]
 } ]

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
index 7d60977..28c6bf1 100644
--- a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1506645932520_24630151",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2018-04-19T23:54:42.734GMT",
+    "endTime" : "2018-04-19T23:56:29.134GMT",
+    "lastUpdated" : "",
+    "duration" : 106400,
+    "sparkUser" : "edlu",
+    "completed" : true,
+    "appSparkVersion" : "2.4.0-SNAPSHOT",
+    "lastUpdatedEpoch" : 0,
+    "startTimeEpoch" : 1524182082734,
+    "endTimeEpoch" : 1524182189134
+  } ]
+}, {
   "id" : "application_1516285256255_0012",
   "name" : "Spark shell",
   "attempts" : [ {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
index dfbfd8a..f547b79 100644
--- a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1506645932520_24630151",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2018-04-19T23:54:42.734GMT",
+    "endTime" : "2018-04-19T23:56:29.134GMT",
+    "lastUpdated" : "",
+    "duration" : 106400,
+    "sparkUser" : "edlu",
+    "completed" : true,
+    "appSparkVersion" : "2.4.0-SNAPSHOT",
+    "lastUpdatedEpoch" : 0,
+    "startTimeEpoch" : 1524182082734,
+    "endTimeEpoch" : 1524182189134
+  } ]
+}, {
   "id" : "application_1516285256255_0012",
   "name" : "Spark shell",
   "attempts" : [ {
@@ -101,4 +116,4 @@
     "startTimeEpoch" : 1430917380880,
     "endTimeEpoch" : 1430917380890
   } ]
-} ]
\ No newline at end of file
+} ]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org