You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mc...@apache.org on 2018/09/07 17:42:51 UTC

[1/3] spark git commit: [SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API

Repository: spark
Updated Branches:
  refs/heads/master 458f5011b -> 9241e1e7e


http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index b705556..de479db 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -28,7 +28,7 @@ import org.mockito.Matchers._
 import org.mockito.Mockito.{mock, spy, verify, when}
 import org.scalatest.{BeforeAndAfterEach, PrivateMethodTester}
 
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -77,7 +77,7 @@ class HeartbeatReceiverSuite
     heartbeatReceiverClock = new ManualClock
     heartbeatReceiver = new HeartbeatReceiver(sc, heartbeatReceiverClock)
     heartbeatReceiverRef = sc.env.rpcEnv.setupEndpoint("heartbeat", heartbeatReceiver)
-    when(scheduler.executorHeartbeatReceived(any(), any(), any())).thenReturn(true)
+    when(scheduler.executorHeartbeatReceived(any(), any(), any(), any())).thenReturn(true)
   }
 
   /**
@@ -213,8 +213,10 @@ class HeartbeatReceiverSuite
       executorShouldReregister: Boolean): Unit = {
     val metrics = TaskMetrics.empty
     val blockManagerId = BlockManagerId(executorId, "localhost", 12345)
+    val executorUpdates = new ExecutorMetrics(Array(123456L, 543L, 12345L, 1234L, 123L,
+      12L, 432L, 321L, 654L, 765L))
     val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
-      Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId))
+      Heartbeat(executorId, Array(1L -> metrics.accumulators()), blockManagerId, executorUpdates))
     if (executorShouldReregister) {
       assert(response.reregisterBlockManager)
     } else {
@@ -223,7 +225,8 @@ class HeartbeatReceiverSuite
       verify(scheduler).executorHeartbeatReceived(
         Matchers.eq(executorId),
         Matchers.eq(Array(1L -> metrics.accumulators())),
-        Matchers.eq(blockManagerId))
+        Matchers.eq(blockManagerId),
+        Matchers.eq(executorUpdates))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 11b2912..11a2db8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -82,6 +82,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
       .set("spark.history.fs.update.interval", "0")
       .set("spark.testing", "true")
       .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
+      .set("spark.eventLog.logStageExecutorMetrics.enabled", "true")
     conf.setAll(extraConf)
     provider = new FsHistoryProvider(conf)
     provider.checkForLogs()
@@ -128,6 +129,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     "succeeded&failed job list json" ->
       "applications/local-1422981780767/jobs?status=succeeded&status=failed",
     "executor list json" -> "applications/local-1422981780767/executors",
+    "executor list with executor metrics json" ->
+      "applications/application_1506645932520_24630151/executors",
     "stage list json" -> "applications/local-1422981780767/stages",
     "complete stage list json" -> "applications/local-1422981780767/stages?status=complete",
     "failed stage list json" -> "applications/local-1422981780767/stages?status=failed",

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4e87deb..365eab0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -30,6 +30,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark._
 import org.apache.spark.broadcast.BroadcastManager
+import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.config
 import org.apache.spark.rdd.{DeterministicLevel, RDD}
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
@@ -140,7 +141,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     override def executorHeartbeatReceived(
         execId: String,
         accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
-        blockManagerId: BlockManagerId): Boolean = true
+        blockManagerId: BlockManagerId,
+        executorUpdates: ExecutorMetrics): Boolean = true
     override def submitTasks(taskSet: TaskSet) = {
       // normally done by TaskSetManager
       taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
@@ -660,7 +662,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
       override def executorHeartbeatReceived(
           execId: String,
           accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
-          blockManagerId: BlockManagerId): Boolean = true
+          blockManagerId: BlockManagerId,
+          executorMetrics: ExecutorMetrics): Boolean = true
       override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
       override def workerRemoved(workerId: String, host: String, message: String): Unit = {}
       override def applicationAttemptId(): Option[String] = None

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index a9e92fa..cecd699 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -19,7 +19,9 @@ package org.apache.spark.scheduler
 
 import java.io.{File, FileOutputStream, InputStream, IOException}
 
+import scala.collection.immutable.Map
 import scala.collection.mutable
+import scala.collection.mutable.Set
 import scala.io.Source
 
 import org.apache.hadoop.fs.Path
@@ -29,11 +31,14 @@ import org.scalatest.BeforeAndAfter
 
 import org.apache.spark._
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.io._
-import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
+import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.{JsonProtocol, Utils}
 
+
 /**
  * Test whether EventLoggingListener logs events properly.
  *
@@ -43,6 +48,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
  */
 class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter
   with Logging {
+
   import EventLoggingListenerSuite._
 
   private val fileSystem = Utils.getHadoopFileSystem("/",
@@ -137,6 +143,10 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
         "a fine:mind$dollar{bills}.1", None, Some("lz4")))
   }
 
+  test("Executor metrics update") {
+    testStageExecutorMetricsEventLogging()
+  }
+
   /* ----------------- *
    * Actual test logic *
    * ----------------- */
@@ -251,6 +261,214 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
     }
   }
 
+  /**
+   * Test stage executor metrics logging functionality. This checks that peak
+   * values from SparkListenerExecutorMetricsUpdate events during a stage are
+   * logged in a StageExecutorMetrics event for each executor at stage completion.
+   */
+  private def testStageExecutorMetricsEventLogging() {
+    val conf = getLoggingConf(testDirPath, None)
+    val logName = "stageExecutorMetrics-test"
+    val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
+    val listenerBus = new LiveListenerBus(conf)
+
+    // Events to post.
+    val events = Array(
+      SparkListenerApplicationStart("executionMetrics", None,
+        1L, "update", None),
+      createExecutorAddedEvent(1),
+      createExecutorAddedEvent(2),
+      createStageSubmittedEvent(0),
+      // receive 3 metric updates from each executor with just stage 0 running,
+      // with different peak updates for each executor
+      createExecutorMetricsUpdateEvent(1,
+        new ExecutorMetrics(Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L))),
+      createExecutorMetricsUpdateEvent(2,
+        new ExecutorMetrics(Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L))),
+      // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6
+      createExecutorMetricsUpdateEvent(1,
+        new ExecutorMetrics(Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L))),
+      // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6
+      createExecutorMetricsUpdateEvent(2,
+        new ExecutorMetrics(Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L))),
+      // exec 1: new stage 0 peaks for metrics at indexes: 5, 7
+      createExecutorMetricsUpdateEvent(1,
+        new ExecutorMetrics(Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L))),
+      // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8
+      createExecutorMetricsUpdateEvent(2,
+        new ExecutorMetrics(Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L))),
+      // now start stage 1, one more metric update for each executor, and new
+      // peaks for some stage 1 metrics (as listed), initialize stage 1 peaks
+      createStageSubmittedEvent(1),
+      // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7; initialize stage 1 peaks
+      createExecutorMetricsUpdateEvent(1,
+        new ExecutorMetrics(Array(5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L, 0L))),
+      // exec 2: new stage 0 peaks for metrics at indexes: 0, 1, 2, 3, 6, 7, 9;
+      // initialize stage 1 peaks
+      createExecutorMetricsUpdateEvent(2,
+        new ExecutorMetrics(Array(7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 10L, 40L))),
+      // complete stage 0, and 3 more updates for each executor with just
+      // stage 1 running
+      createStageCompletedEvent(0),
+      // exec 1: new stage 1 peaks for metrics at indexes: 0, 1, 3
+      createExecutorMetricsUpdateEvent(1,
+        new ExecutorMetrics(Array(6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L))),
+      // enew ExecutorMetrics(xec 2: new stage 1 peaks for metrics at indexes: 3, 4, 7, 8
+      createExecutorMetricsUpdateEvent(2,
+        new ExecutorMetrics(Array(5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L, 20L))),
+      // exec 1: new stage 1 peaks for metrics at indexes: 0, 4, 5, 7
+      createExecutorMetricsUpdateEvent(1,
+        new ExecutorMetrics(Array(7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L))),
+      // exec 2: new stage 1 peak for metrics at index: 7
+      createExecutorMetricsUpdateEvent(2,
+        new ExecutorMetrics(Array(5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 20L))),
+      // exec 1: no new stage 1 peaks
+      createExecutorMetricsUpdateEvent(1,
+        new ExecutorMetrics(Array(5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L, 0L))),
+      createExecutorRemovedEvent(1),
+      // exec 2: new stage 1 peak for metrics at index: 6
+      createExecutorMetricsUpdateEvent(2,
+        new ExecutorMetrics(Array(4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L))),
+      createStageCompletedEvent(1),
+      SparkListenerApplicationEnd(1000L))
+
+    // play the events for the event logger
+    eventLogger.start()
+    listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem]))
+    listenerBus.addToEventLogQueue(eventLogger)
+    events.foreach(event => listenerBus.post(event))
+    listenerBus.stop()
+    eventLogger.stop()
+
+    // expected StageExecutorMetrics, for the given stage id and executor id
+    val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] =
+      Map(
+        ((0, "1"),
+          new SparkListenerStageExecutorMetrics("1", 0, 0,
+            new ExecutorMetrics(Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L)))),
+        ((0, "2"),
+          new SparkListenerStageExecutorMetrics("2", 0, 0,
+            new ExecutorMetrics(Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L)))),
+        ((1, "1"),
+          new SparkListenerStageExecutorMetrics("1", 1, 0,
+            new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L)))),
+        ((1, "2"),
+          new SparkListenerStageExecutorMetrics("2", 1, 0,
+            new ExecutorMetrics(Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L)))))
+
+    // Verify the log file contains the expected events.
+    // Posted events should be logged, except for ExecutorMetricsUpdate events -- these
+    // are consolidated, and the peak values for each stage are logged at stage end.
+    val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem)
+    try {
+      val lines = readLines(logData)
+      val logStart = SparkListenerLogStart(SPARK_VERSION)
+      assert(lines.size === 14)
+      assert(lines(0).contains("SparkListenerLogStart"))
+      assert(lines(1).contains("SparkListenerApplicationStart"))
+      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+      var logIdx = 1
+      events.foreach {event =>
+        event match {
+          case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
+          case stageCompleted: SparkListenerStageCompleted =>
+            val execIds = Set[String]()
+            (1 to 2).foreach { _ =>
+              val execId = checkStageExecutorMetrics(lines(logIdx),
+                stageCompleted.stageInfo.stageId, expectedMetricsEvents)
+              execIds += execId
+              logIdx += 1
+            }
+            assert(execIds.size == 2) // check that each executor was logged
+            checkEvent(lines(logIdx), event)
+            logIdx += 1
+        case _ =>
+          checkEvent(lines(logIdx), event)
+          logIdx += 1
+        }
+      }
+    } finally {
+      logData.close()
+    }
+  }
+
+  private def createStageSubmittedEvent(stageId: Int) = {
+    SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0,
+      Seq.empty, Seq.empty, "details"))
+  }
+
+  private def createStageCompletedEvent(stageId: Int) = {
+    SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0,
+      Seq.empty, Seq.empty, "details"))
+  }
+
+  private def createExecutorAddedEvent(executorId: Int) = {
+    SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInfo("host1", 1, Map.empty))
+  }
+
+  private def createExecutorRemovedEvent(executorId: Int) = {
+    SparkListenerExecutorRemoved(0L, executorId.toString, "test")
+  }
+
+  private def createExecutorMetricsUpdateEvent(
+      executorId: Int,
+      executorMetrics: ExecutorMetrics): SparkListenerExecutorMetricsUpdate = {
+    val taskMetrics = TaskMetrics.empty
+    taskMetrics.incDiskBytesSpilled(111)
+    taskMetrics.incMemoryBytesSpilled(222)
+    val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo)))
+    SparkListenerExecutorMetricsUpdate(executorId.toString, accum, Some(executorMetrics))
+  }
+
+  /** Check that the Spark history log line matches the expected event. */
+  private def checkEvent(line: String, event: SparkListenerEvent): Unit = {
+    assert(line.contains(event.getClass.toString.split("\\.").last))
+    val parsed = JsonProtocol.sparkEventFromJson(parse(line))
+    assert(parsed.getClass === event.getClass)
+    (event, parsed) match {
+      case (expected: SparkListenerStageSubmitted, actual: SparkListenerStageSubmitted) =>
+        // accumulables can be different, so only check the stage Id
+        assert(expected.stageInfo.stageId == actual.stageInfo.stageId)
+      case (expected: SparkListenerStageCompleted, actual: SparkListenerStageCompleted) =>
+        // accumulables can be different, so only check the stage Id
+        assert(expected.stageInfo.stageId == actual.stageInfo.stageId)
+      case (expected: SparkListenerEvent, actual: SparkListenerEvent) =>
+        assert(expected === actual)
+    }
+  }
+
+  /**
+   * Check that the Spark history log line is an StageExecutorMetrics event, and matches the
+   * expected value for the stage and executor.
+   *
+   * @param line the Spark history log line
+   * @param stageId the stage ID the ExecutorMetricsUpdate is associated with
+   * @param expectedEvents map of expected ExecutorMetricsUpdate events, for (stageId, executorId)
+   */
+  private def checkStageExecutorMetrics(
+      line: String,
+      stageId: Int,
+      expectedEvents: Map[(Int, String), SparkListenerStageExecutorMetrics]): String = {
+    JsonProtocol.sparkEventFromJson(parse(line)) match {
+      case executorMetrics: SparkListenerStageExecutorMetrics =>
+          expectedEvents.get((stageId, executorMetrics.execId)) match {
+            case Some(expectedMetrics) =>
+              assert(executorMetrics.execId === expectedMetrics.execId)
+              assert(executorMetrics.stageId === expectedMetrics.stageId)
+              assert(executorMetrics.stageAttemptId === expectedMetrics.stageAttemptId)
+              ExecutorMetricType.values.foreach { metricType =>
+                assert(executorMetrics.executorMetrics.getMetricValue(metricType) ===
+                  expectedMetrics.executorMetrics.getMetricValue(metricType))
+              }
+            case None =>
+              assert(false)
+        }
+        executorMetrics.execId
+      case _ =>
+        fail("expecting SparkListenerStageExecutorMetrics")
+    }
+  }
+
   private def readLines(in: InputStream): Seq[String] = {
     Source.fromInputStream(in).getLines().toSeq
   }
@@ -299,6 +517,7 @@ object EventLoggingListenerSuite {
       conf.set("spark.eventLog.compress", "true")
       conf.set("spark.io.compression.codec", codec)
     }
+    conf.set("spark.eventLog.logStageExecutorMetrics.enabled", "true")
     conf
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index b470591..0621c98 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.scheduler
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.AccumulatorV2
@@ -92,5 +93,6 @@ private class DummyTaskScheduler extends TaskScheduler {
   def executorHeartbeatReceived(
       execId: String,
       accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
-      blockManagerId: BlockManagerId): Boolean = true
+      blockManagerId: BlockManagerId,
+      executorMetrics: ExecutorMetrics): Boolean = true
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index e24d550..d1113c7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -22,6 +22,7 @@ import java.net.URI
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.hadoop.fs.Path
+import org.json4s.JsonAST.JValue
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.BeforeAndAfter
 
@@ -217,7 +218,9 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
 
     // Verify the same events are replayed in the same order
     assert(sc.eventLogger.isDefined)
-    val originalEvents = sc.eventLogger.get.loggedEvents
+    val originalEvents = sc.eventLogger.get.loggedEvents.filter { e =>
+      !JsonProtocol.sparkEventFromJson(e).isInstanceOf[SparkListenerStageExecutorMetrics]
+    }
     val replayedEvents = eventMonster.loggedEvents
     originalEvents.zip(replayedEvents).foreach { case (e1, e2) =>
       // Don't compare the JSON here because accumulators in StageInfo may be out of order

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index ea80fea..d0c2dc4 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -22,18 +22,19 @@ import java.lang.{Integer => JInteger, Long => JLong}
 import java.util.{Arrays, Date, Properties}
 
 import scala.collection.JavaConverters._
+import scala.collection.immutable.Map
 import scala.reflect.{classTag, ClassTag}
 
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark._
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
+import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster._
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage._
 import org.apache.spark.util.Utils
-import org.apache.spark.util.kvstore._
 
 class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
 
@@ -1263,6 +1264,130 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
     }
   }
 
+  test("executor metrics updates") {
+    val listener = new AppStatusListener(store, conf, true)
+
+    val driver = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "localhost", 42)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    listener.onStageSubmitted(createStageSubmittedEvent(0))
+    // receive 3 metric updates from each executor with just stage 0 running,
+    // with different peak updates for each executor
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1,
+      Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)))
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2,
+      Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)))
+    // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1,
+      Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)))
+    // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2,
+      Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)))
+    // exec 1: new stage 0 peaks for metrics at indexes: 5, 7
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1,
+      Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)))
+    // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2,
+      Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)))
+    // now start stage 1, one more metric update for each executor, and new
+    // peaks for some stage 1 metrics (as listed), initialize stage 1 peaks
+    listener.onStageSubmitted(createStageSubmittedEvent(1))
+    // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1,
+      Array(5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L, 0L)))
+    // exec 2: new stage 0 peaks for metrics at indexes: 0, 1, 2, 3, 6, 7, 9
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2,
+      Array(7000L, 80L, 50L, 20L, 0L, 10L, 50L, 30L, 10L, 40L)))
+    // complete stage 0, and 3 more updates for each executor with just
+    // stage 1 running
+    listener.onStageCompleted(createStageCompletedEvent(0))
+    // exec 1: new stage 1 peaks for metrics at indexes: 0, 1, 3
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1,
+      Array(6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L)))
+    // exec 2: new stage 1 peaks for metrics at indexes: 3, 4, 7, 8
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2,
+      Array(5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L, 20L)))
+    // exec 1: new stage 1 peaks for metrics at indexes: 0, 4, 5, 7
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1,
+      Array(7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L)))
+    // exec 2: new stage 1 peak for metrics at index: 7
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2,
+      Array(5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 20L)))
+    // exec 1: no new stage 1 peaks
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(1,
+      Array(5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L, 0L)))
+    listener.onExecutorRemoved(createExecutorRemovedEvent(1))
+    // exec 2: new stage 1 peak for metrics at index: 6
+    listener.onExecutorMetricsUpdate(createExecutorMetricsUpdateEvent(2,
+      Array(4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L)))
+    listener.onStageCompleted(createStageCompletedEvent(1))
+
+    // expected peak values for each executor
+    val expectedValues = Map(
+      "1" -> new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 100L, 55L, 70L, 20L)),
+      "2" -> new ExecutorMetrics(Array(7000L, 80L, 50L, 40L, 10L, 30L, 50L, 60L, 80L, 40L)))
+
+    // check that the stored peak values match the expected values
+    expectedValues.foreach { case (id, metrics) =>
+      check[ExecutorSummaryWrapper](id) { exec =>
+        assert(exec.info.id === id)
+        exec.info.peakMemoryMetrics match {
+          case Some(actual) =>
+            ExecutorMetricType.values.foreach { metricType =>
+              assert(actual.getMetricValue(metricType) === metrics.getMetricValue(metricType))
+            }
+          case _ =>
+            assert(false)
+        }
+      }
+    }
+  }
+
+  test("stage executor metrics") {
+    // simulate reading in StageExecutorMetrics events from the history log
+    val listener = new AppStatusListener(store, conf, false)
+    val driver = BlockManagerId(SparkContext.DRIVER_IDENTIFIER, "localhost", 42)
+
+    listener.onExecutorAdded(createExecutorAddedEvent(1))
+    listener.onExecutorAdded(createExecutorAddedEvent(2))
+    listener.onStageSubmitted(createStageSubmittedEvent(0))
+    listener.onStageSubmitted(createStageSubmittedEvent(1))
+    listener.onStageExecutorMetrics(SparkListenerStageExecutorMetrics("1", 0, 0,
+      new ExecutorMetrics(Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))))
+    listener.onStageExecutorMetrics(SparkListenerStageExecutorMetrics("2", 0, 0,
+      new ExecutorMetrics(Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))))
+     listener.onStageCompleted(createStageCompletedEvent(0))
+    // executor 1 is removed before stage 1 has finished, the stage executor metrics
+    // are logged afterwards and should still be used to update the executor metrics.
+    listener.onExecutorRemoved(createExecutorRemovedEvent(1))
+    listener.onStageExecutorMetrics(SparkListenerStageExecutorMetrics("1", 1, 0,
+      new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))))
+    listener.onStageExecutorMetrics(SparkListenerStageExecutorMetrics("2", 1, 0,
+      new ExecutorMetrics(Array(7000L, 80L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L))))
+    listener.onStageCompleted(createStageCompletedEvent(1))
+
+    // expected peak values for each executor
+    val expectedValues = Map(
+      "1" -> new ExecutorMetrics(Array(7000L, 70L, 50L, 30L, 60L, 30L, 100L, 55L, 70L, 20L)),
+      "2" -> new ExecutorMetrics(Array(7000L, 80L, 50L, 40L, 10L, 30L, 50L, 60L, 80L, 40L)))
+
+    // check that the stored peak values match the expected values
+    for ((id, metrics) <- expectedValues) {
+      check[ExecutorSummaryWrapper](id) { exec =>
+        assert(exec.info.id === id)
+        exec.info.peakMemoryMetrics match {
+          case Some(actual) =>
+            ExecutorMetricType.values.foreach { metricType =>
+              assert(actual.getMetricValue(metricType) === metrics.getMetricValue(metricType))
+            }
+          case _ =>
+            assert(false)
+        }
+      }
+    }
+  }
+
   private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber)
 
   private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {
@@ -1300,4 +1425,37 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
 
   }
 
+  /** Create a stage submitted event for the specified stage Id. */
+  private def createStageSubmittedEvent(stageId: Int) = {
+    SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0,
+      Seq.empty, Seq.empty, "details"))
+  }
+
+  /** Create a stage completed event for the specified stage Id. */
+  private def createStageCompletedEvent(stageId: Int) = {
+    SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0,
+      Seq.empty, Seq.empty, "details"))
+  }
+
+  /** Create an executor added event for the specified executor Id. */
+  private def createExecutorAddedEvent(executorId: Int) = {
+    SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInfo("host1", 1, Map.empty))
+  }
+
+  /** Create an executor added event for the specified executor Id. */
+  private def createExecutorRemovedEvent(executorId: Int) = {
+    SparkListenerExecutorRemoved(10L, executorId.toString, "test")
+  }
+
+  /** Create an executor metrics update event, with the specified executor metrics values. */
+  private def createExecutorMetricsUpdateEvent(
+      executorId: Int,
+      executorMetrics: Array[Long]): SparkListenerExecutorMetricsUpdate = {
+    val taskMetrics = TaskMetrics.empty
+    taskMetrics.incDiskBytesSpilled(111)
+    taskMetrics.incMemoryBytesSpilled(222)
+    val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo)))
+    SparkListenerExecutorMetricsUpdate(executorId.toString, accum,
+      Some(new ExecutorMetrics(executorMetrics)))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 74b72d9..1e0d2af 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -30,6 +30,7 @@ import org.scalatest.exceptions.TestFailedException
 
 import org.apache.spark._
 import org.apache.spark.executor._
+import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.rdd.RDDOperationScope
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -94,11 +95,17 @@ class JsonProtocolSuite extends SparkFunSuite {
         makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true)
           .accumulators().map(AccumulatorSuite.makeInfo)
           .zipWithIndex.map { case (a, i) => a.copy(id = i) }
-      SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates)))
+      val executorUpdates = new ExecutorMetrics(
+        Array(543L, 123456L, 12345L, 1234L, 123L, 12L, 432L, 321L, 654L, 765L))
+      SparkListenerExecutorMetricsUpdate("exec3", Seq((1L, 2, 3, accumUpdates)),
+        Some(executorUpdates))
     }
     val blockUpdated =
       SparkListenerBlockUpdated(BlockUpdatedInfo(BlockManagerId("Stars",
         "In your multitude...", 300), RDDBlockId(0, 0), StorageLevel.MEMORY_ONLY, 100L, 0L))
+    val stageExecutorMetrics =
+      SparkListenerStageExecutorMetrics("1", 2, 3,
+        new ExecutorMetrics(Array(543L, 123456L, 12345L, 1234L, 123L, 12L, 432L, 321L, 654L, 765L)))
 
     testEvent(stageSubmitted, stageSubmittedJsonString)
     testEvent(stageCompleted, stageCompletedJsonString)
@@ -124,6 +131,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testEvent(nodeUnblacklisted, nodeUnblacklistedJsonString)
     testEvent(executorMetricsUpdate, executorMetricsUpdateJsonString)
     testEvent(blockUpdated, blockUpdatedJsonString)
+    testEvent(stageExecutorMetrics, stageExecutorMetricsJsonString)
   }
 
   test("Dependent Classes") {
@@ -419,6 +427,30 @@ class JsonProtocolSuite extends SparkFunSuite {
       exceptionFailure.accumUpdates, oldExceptionFailure.accumUpdates, (x, y) => x == y)
   }
 
+  test("ExecutorMetricsUpdate backward compatibility: executor metrics update") {
+    // executorMetricsUpdate was added in 2.4.0.
+    val executorMetricsUpdate = makeExecutorMetricsUpdate("1", true, true)
+    val oldExecutorMetricsUpdateJson =
+      JsonProtocol.executorMetricsUpdateToJson(executorMetricsUpdate)
+        .removeField( _._1 == "Executor Metrics Updated")
+    val exepectedExecutorMetricsUpdate = makeExecutorMetricsUpdate("1", true, false)
+    assertEquals(exepectedExecutorMetricsUpdate,
+      JsonProtocol.executorMetricsUpdateFromJson(oldExecutorMetricsUpdateJson))
+  }
+
+  test("executorMetricsFromJson backward compatibility: handle missing metrics") {
+    // any missing metrics should be set to 0
+    val executorMetrics = new ExecutorMetrics(
+      Array(12L, 23L, 45L, 67L, 78L, 89L, 90L, 123L, 456L, 789L))
+    val oldExecutorMetricsJson =
+      JsonProtocol.executorMetricsToJson(executorMetrics)
+        .removeField( _._1 == "MappedPoolMemory")
+    val expectedExecutorMetrics = new ExecutorMetrics(
+      Array(12L, 23L, 45L, 67L, 78L, 89L, 90L, 123L, 456L, 0L))
+    assertEquals(expectedExecutorMetrics,
+      JsonProtocol.executorMetricsFromJson(oldExecutorMetricsJson))
+  }
+
   test("AccumulableInfo value de/serialization") {
     import InternalAccumulator._
     val blocks = Seq[(BlockId, BlockStatus)](
@@ -435,7 +467,6 @@ class JsonProtocolSuite extends SparkFunSuite {
     testAccumValue(Some("anything"), blocks, JString(blocks.toString))
     testAccumValue(Some("anything"), 123, JString("123"))
   }
-
 }
 
 
@@ -565,6 +596,13 @@ private[spark] object JsonProtocolSuite extends Assertions {
             assert(stageAttemptId1 === stageAttemptId2)
             assertSeqEquals[AccumulableInfo](updates1, updates2, (a, b) => a.equals(b))
           })
+        assertOptionEquals(e1.executorUpdates, e2.executorUpdates,
+        (e1: ExecutorMetrics, e2: ExecutorMetrics) => assertEquals(e1, e2))
+      case (e1: SparkListenerStageExecutorMetrics, e2: SparkListenerStageExecutorMetrics) =>
+        assert(e1.execId === e2.execId)
+        assert(e1.stageId === e2.stageId)
+        assert(e1.stageAttemptId === e2.stageAttemptId)
+        assertEquals(e1.executorMetrics, e2.executorMetrics)
       case (e1, e2) =>
         assert(e1 === e2)
       case _ => fail("Events don't match in types!")
@@ -715,6 +753,12 @@ private[spark] object JsonProtocolSuite extends Assertions {
       assertStackTraceElementEquals)
   }
 
+  private def assertEquals(metrics1: ExecutorMetrics, metrics2: ExecutorMetrics) {
+    ExecutorMetricType.values.foreach { metricType =>
+      assert(metrics1.getMetricValue(metricType) === metrics2.getMetricValue(metricType))
+    }
+  }
+
   private def assertJsonStringEquals(expected: String, actual: String, metadata: String) {
     val expectedJson = pretty(parse(expected))
     val actualJson = pretty(parse(actual))
@@ -765,7 +809,6 @@ private[spark] object JsonProtocolSuite extends Assertions {
     assert(ste1 === ste2)
   }
 
-
   /** ----------------------------------- *
    | Util methods for constructing events |
    * ------------------------------------ */
@@ -820,6 +863,27 @@ private[spark] object JsonProtocolSuite extends Assertions {
     new AccumulableInfo(id, Some(s"Accumulable$id"), Some(s"delta$id"), Some(s"val$id"),
       internal, countFailedValues, metadata)
 
+  /** Creates an SparkListenerExecutorMetricsUpdate event */
+  private def makeExecutorMetricsUpdate(
+      execId: String,
+      includeTaskMetrics: Boolean,
+      includeExecutorMetrics: Boolean): SparkListenerExecutorMetricsUpdate = {
+    val taskMetrics =
+      if (includeTaskMetrics) {
+        Seq((1L, 1, 1, Seq(makeAccumulableInfo(1, false, false, None),
+          makeAccumulableInfo(2, false, false, None))))
+      } else {
+        Seq()
+      }
+    val executorMetricsUpdate =
+      if (includeExecutorMetrics) {
+        Some(new ExecutorMetrics(Array(123456L, 543L, 0L, 0L, 0L, 0L, 0L, 0L, 0L, 0L)))
+       } else {
+        None
+      }
+    SparkListenerExecutorMetricsUpdate(execId, taskMetrics, executorMetricsUpdate)
+  }
+
   /**
    * Creates a TaskMetrics object describing a task that read data from Hadoop (if hasHadoopInput is
    * set to true) or read data from a shuffle otherwise.
@@ -2007,7 +2071,42 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |        }
       |      ]
       |    }
-      |  ]
+      |  ],
+      |  "Executor Metrics Updated" : {
+      |    "JVMHeapMemory" : 543,
+      |    "JVMOffHeapMemory" : 123456,
+      |    "OnHeapExecutionMemory" : 12345,
+      |    "OffHeapExecutionMemory" : 1234,
+      |    "OnHeapStorageMemory" : 123,
+      |    "OffHeapStorageMemory" : 12,
+      |    "OnHeapUnifiedMemory" : 432,
+      |    "OffHeapUnifiedMemory" : 321,
+      |    "DirectPoolMemory" : 654,
+      |    "MappedPoolMemory" : 765
+      |  }
+      |
+      |}
+    """.stripMargin
+
+  private val stageExecutorMetricsJsonString =
+    """
+      |{
+      |  "Event": "SparkListenerStageExecutorMetrics",
+      |  "Executor ID": "1",
+      |  "Stage ID": 2,
+      |  "Stage Attempt ID": 3,
+      |  "Executor Metrics" : {
+      |    "JVMHeapMemory" : 543,
+      |    "JVMOffHeapMemory" : 123456,
+      |    "OnHeapExecutionMemory" : 12345,
+      |    "OffHeapExecutionMemory" : 1234,
+      |    "OnHeapStorageMemory" : 123,
+      |    "OffHeapStorageMemory" : 12,
+      |    "OnHeapUnifiedMemory" : 432,
+      |    "OffHeapUnifiedMemory" : 321,
+      |    "DirectPoolMemory" : 654,
+      |    "MappedPoolMemory" : 765
+      |  }
       |}
     """.stripMargin
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/dev/.rat-excludes
----------------------------------------------------------------------
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 466135e..7779500 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -81,6 +81,7 @@ app-20180109111548-0000
 app-20161115172038-0000
 app-20161116163331-0000
 application_1516285256255_0012
+application_1506645932520_24630151
 local-1422981759269
 local-1422981780767
 local-1425081759269

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 7ff783d..55dc2b8 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,12 @@ object MimaExcludes {
 
   // Exclude rules for 2.4.x
   lazy val v24excludes = v23excludes ++ Seq(
+    // [SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.apply"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.copy"),
+    ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate.this"),
+    ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate$"),
+
     // [SPARK-25248] add package private methods to TaskContext
     ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.markTaskFailed"),
     ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.TaskContext.markInterrupted"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[3/3] spark git commit: [SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API

Posted by mc...@apache.org.
[SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API

Add new executor level memory metrics (JVM used memory, on/off heap execution memory, on/off heap storage memory, on/off heap unified memory, direct memory, and mapped memory), and expose via the executors REST API. This information will help provide insight into how executor and driver JVM memory is used, and for the different memory regions. It can be used to help determine good values for spark.executor.memory, spark.driver.memory, spark.memory.fraction, and spark.memory.storageFraction.

## What changes were proposed in this pull request?

An ExecutorMetrics class is added, with jvmUsedHeapMemory, jvmUsedNonHeapMemory, onHeapExecutionMemory, offHeapExecutionMemory, onHeapStorageMemory, and offHeapStorageMemory, onHeapUnifiedMemory, offHeapUnifiedMemory, directMemory and mappedMemory. The new ExecutorMetrics is sent by executors to the driver as part of the Heartbeat. A heartbeat is added for the driver as well, to collect these metrics for the driver.

The EventLoggingListener store information about the peak values for each metric, per active stage and executor. When a StageCompleted event is seen, a StageExecutorsMetrics event will be logged for each executor, with peak values for the stage.

The AppStatusListener records the peak values for each memory metric.

The new memory metrics are added to the executors REST API.

## How was this patch tested?

New unit tests have been added. This was also tested on our cluster.

Author: Edwina Lu <ed...@linkedin.com>
Author: Imran Rashid <ir...@cloudera.com>
Author: edwinalu <ed...@gmail.com>

Closes #21221 from edwinalu/SPARK-23429.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9241e1e7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9241e1e7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9241e1e7

Branch: refs/heads/master
Commit: 9241e1e7e66574cfafa68791771959dfc39c9684
Parents: 458f501
Author: Edwina Lu <ed...@linkedin.com>
Authored: Fri Sep 7 10:42:46 2018 -0700
Committer: mcheah <mc...@palantir.com>
Committed: Fri Sep 7 10:42:46 2018 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkFirehoseListener.java |   6 +
 .../org/apache/spark/HeartbeatReceiver.scala    |   8 +-
 .../scala/org/apache/spark/Heartbeater.scala    |  71 +++++
 .../scala/org/apache/spark/SparkContext.scala   |  20 ++
 .../org/apache/spark/executor/Executor.scala    |  36 +--
 .../apache/spark/executor/ExecutorMetrics.scala |  81 +++++
 .../apache/spark/internal/config/package.scala  |   5 +
 .../org/apache/spark/memory/MemoryManager.scala |  28 ++
 .../spark/metrics/ExecutorMetricType.scala      | 104 ++++++
 .../apache/spark/scheduler/DAGScheduler.scala   |   9 +-
 .../spark/scheduler/EventLoggingListener.scala  |  54 +++-
 .../apache/spark/scheduler/SparkListener.scala  |  32 +-
 .../spark/scheduler/SparkListenerBus.scala      |   2 +
 .../apache/spark/scheduler/TaskScheduler.scala  |  10 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |  13 +-
 .../apache/spark/status/AppStatusListener.scala |  44 ++-
 .../org/apache/spark/status/LiveEntity.scala    |   9 +-
 .../org/apache/spark/status/api/v1/api.scala    |  39 ++-
 .../org/apache/spark/util/JsonProtocol.scala    |  47 ++-
 .../application_list_json_expectation.json      |  15 +
 .../completed_app_list_json_expectation.json    |  15 +
 ..._with_executor_metrics_json_expectation.json | 314 +++++++++++++++++++
 .../limit_app_list_json_expectation.json        |  30 +-
 .../minDate_app_list_json_expectation.json      |  15 +
 .../minEndDate_app_list_json_expectation.json   |  17 +-
 .../application_1506645932520_24630151          |  63 ++++
 .../apache/spark/HeartbeatReceiverSuite.scala   |  11 +-
 .../deploy/history/HistoryServerSuite.scala     |   3 +
 .../spark/scheduler/DAGSchedulerSuite.scala     |   7 +-
 .../scheduler/EventLoggingListenerSuite.scala   | 221 ++++++++++++-
 .../scheduler/ExternalClusterManagerSuite.scala |   4 +-
 .../spark/scheduler/ReplayListenerSuite.scala   |   5 +-
 .../spark/status/AppStatusListenerSuite.scala   | 162 +++++++++-
 .../apache/spark/util/JsonProtocolSuite.scala   | 107 ++++++-
 dev/.rat-excludes                               |   1 +
 project/MimaExcludes.scala                      |   6 +
 36 files changed, 1531 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 94c5c11..731f6fc 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -104,6 +104,12 @@ public class SparkFirehoseListener implements SparkListenerInterface {
   }
 
   @Override
+  public final void onStageExecutorMetrics(
+      SparkListenerStageExecutorMetrics executorMetrics) {
+    onEvent(executorMetrics);
+  }
+
+  @Override
   public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
     onEvent(executorAdded);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
index bcbc8df..ab0ae55 100644
--- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.{ScheduledFuture, TimeUnit}
 import scala.collection.mutable
 import scala.concurrent.Future
 
+import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
 import org.apache.spark.scheduler._
@@ -37,7 +38,8 @@ import org.apache.spark.util._
 private[spark] case class Heartbeat(
     executorId: String,
     accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], // taskId -> accumulator updates
-    blockManagerId: BlockManagerId)
+    blockManagerId: BlockManagerId,
+    executorUpdates: ExecutorMetrics) // executor level updates
 
 /**
  * An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
@@ -119,14 +121,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
       context.reply(true)
 
     // Messages received from executors
-    case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
+    case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId, executorMetrics) =>
       if (scheduler != null) {
         if (executorLastSeen.contains(executorId)) {
           executorLastSeen(executorId) = clock.getTimeMillis()
           eventLoopThread.submit(new Runnable {
             override def run(): Unit = Utils.tryLogNonFatalError {
               val unknownExecutor = !scheduler.executorHeartbeatReceived(
-                executorId, accumUpdates, blockManagerId)
+                executorId, accumUpdates, blockManagerId, executorMetrics)
               val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
               context.reply(response)
             }

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/Heartbeater.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Heartbeater.scala b/core/src/main/scala/org/apache/spark/Heartbeater.scala
new file mode 100644
index 0000000..5ba1b9b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/Heartbeater.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.internal.Logging
+import org.apache.spark.memory.MemoryManager
+import org.apache.spark.metrics.ExecutorMetricType
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * Creates a heartbeat thread which will call the specified reportHeartbeat function at
+ * intervals of intervalMs.
+ *
+ * @param memoryManager the memory manager for execution and storage memory.
+ * @param reportHeartbeat the heartbeat reporting function to call.
+ * @param name the thread name for the heartbeater.
+ * @param intervalMs the interval between heartbeats.
+ */
+private[spark] class Heartbeater(
+    memoryManager: MemoryManager,
+    reportHeartbeat: () => Unit,
+    name: String,
+    intervalMs: Long) extends Logging {
+  // Executor for the heartbeat task
+  private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor(name)
+
+  /** Schedules a task to report a heartbeat. */
+  def start(): Unit = {
+    // Wait a random interval so the heartbeats don't end up in sync
+    val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
+
+    val heartbeatTask = new Runnable() {
+      override def run(): Unit = Utils.logUncaughtExceptions(reportHeartbeat())
+    }
+    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
+  }
+
+  /** Stops the heartbeat thread. */
+  def stop(): Unit = {
+    heartbeater.shutdown()
+    heartbeater.awaitTermination(10, TimeUnit.SECONDS)
+  }
+
+  /**
+   * Get the current executor level metrics. These are returned as an array, with the index
+   * determined by MetricGetter.values
+   */
+  def getCurrentMetrics(): ExecutorMetrics = {
+    val metrics = ExecutorMetricType.values.map(_.getMetricValue(memoryManager)).toArray
+    new ExecutorMetrics(metrics)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e5b1e0e..d943087 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -213,6 +213,7 @@ class SparkContext(config: SparkConf) extends Logging {
   private var _files: Seq[String] = _
   private var _shutdownHookRef: AnyRef = _
   private var _statusStore: AppStatusStore = _
+  private var _heartbeater: Heartbeater = _
 
   /* ------------------------------------------------------------------------------------- *
    | Accessors and public fields. These provide access to the internal state of the        |
@@ -496,6 +497,11 @@ class SparkContext(config: SparkConf) extends Logging {
     _dagScheduler = new DAGScheduler(this)
     _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
 
+    // create and start the heartbeater for collecting memory metrics
+    _heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat, "driver-heartbeater",
+      conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
+    _heartbeater.start()
+
     // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
     // constructor
     _taskScheduler.start()
@@ -1959,6 +1965,12 @@ class SparkContext(config: SparkConf) extends Logging {
     Utils.tryLogNonFatalError {
       _eventLogger.foreach(_.stop())
     }
+    if (_heartbeater != null) {
+      Utils.tryLogNonFatalError {
+        _heartbeater.stop()
+      }
+      _heartbeater = null
+    }
     if (env != null && _heartbeatReceiver != null) {
       Utils.tryLogNonFatalError {
         env.rpcEnv.stop(_heartbeatReceiver)
@@ -2429,6 +2441,14 @@ class SparkContext(config: SparkConf) extends Logging {
     }
   }
 
+  /** Reports heartbeat metrics for the driver. */
+  private def reportHeartBeat(): Unit = {
+    val driverUpdates = _heartbeater.getCurrentMetrics()
+    val accumUpdates = new Array[(Long, Int, Int, Seq[AccumulableInfo])](0)
+    listenerBus.post(SparkListenerExecutorMetricsUpdate("driver", accumUpdates,
+      Some(driverUpdates)))
+  }
+
   // In order to prevent multiple SparkContexts from being active at the same time, mark this
   // context as having finished construction.
   // NOTE: this must be placed at the end of the SparkContext constructor.

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 86b1957..072277c 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -38,7 +38,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
 import org.apache.spark.rpc.RpcTimeout
-import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, Task, TaskDescription}
+import org.apache.spark.scheduler._
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
 import org.apache.spark.util._
@@ -148,7 +148,8 @@ private[spark] class Executor(
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
 
   // Executor for the heartbeat task.
-  private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
+  private val heartbeater = new Heartbeater(env.memoryManager, reportHeartBeat,
+    "executor-heartbeater", conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s"))
 
   // must be initialized before running startDriverHeartbeat()
   private val heartbeatReceiverRef =
@@ -167,7 +168,7 @@ private[spark] class Executor(
    */
   private var heartbeatFailures = 0
 
-  startDriverHeartbeater()
+  heartbeater.start()
 
   private[executor] def numRunningTasks: Int = runningTasks.size()
 
@@ -216,8 +217,12 @@ private[spark] class Executor(
 
   def stop(): Unit = {
     env.metricsSystem.report()
-    heartbeater.shutdown()
-    heartbeater.awaitTermination(10, TimeUnit.SECONDS)
+    try {
+      heartbeater.stop()
+    } catch {
+      case NonFatal(e) =>
+        logWarning("Unable to stop heartbeater", e)
+     }
     threadPool.shutdown()
     if (!isLocal) {
       env.stop()
@@ -787,6 +792,9 @@ private[spark] class Executor(
     val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
     val curGCTime = computeTotalGcTime()
 
+    // get executor level memory metrics
+    val executorUpdates = heartbeater.getCurrentMetrics()
+
     for (taskRunner <- runningTasks.values().asScala) {
       if (taskRunner.task != null) {
         taskRunner.task.metrics.mergeShuffleReadMetrics()
@@ -795,7 +803,8 @@ private[spark] class Executor(
       }
     }
 
-    val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
+    val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId,
+      executorUpdates)
     try {
       val response = heartbeatReceiverRef.askSync[HeartbeatResponse](
           message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
@@ -815,21 +824,6 @@ private[spark] class Executor(
         }
     }
   }
-
-  /**
-   * Schedules a task to report heartbeat and partial metrics for active tasks to driver.
-   */
-  private def startDriverHeartbeater(): Unit = {
-    val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
-
-    // Wait a random interval so the heartbeats don't end up in sync
-    val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
-
-    val heartbeatTask = new Runnable() {
-      override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
-    }
-    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
-  }
 }
 
 private[spark] object Executor {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala
new file mode 100644
index 0000000..2933f3b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/executor/ExecutorMetrics.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.executor
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.metrics.ExecutorMetricType
+
+/**
+ * :: DeveloperApi ::
+ * Metrics tracked for executors and the driver.
+ *
+ * Executor-level metrics are sent from each executor to the driver as part of the Heartbeat.
+ */
+@DeveloperApi
+class ExecutorMetrics private[spark] extends Serializable {
+
+  // Metrics are indexed by MetricGetter.values
+  private val metrics = new Array[Long](ExecutorMetricType.values.length)
+
+  // the first element is initialized to -1, indicating that the values for the array
+  // haven't been set yet.
+  metrics(0) = -1
+
+  /** Returns the value for the specified metricType. */
+  def getMetricValue(metricType: ExecutorMetricType): Long = {
+    metrics(ExecutorMetricType.metricIdxMap(metricType))
+  }
+
+  /** Returns true if the values for the metrics have been set, false otherwise. */
+  def isSet(): Boolean = metrics(0) > -1
+
+  private[spark] def this(metrics: Array[Long]) {
+    this()
+    Array.copy(metrics, 0, this.metrics, 0, Math.min(metrics.size, this.metrics.size))
+  }
+
+  /**
+   * Constructor: create the ExecutorMetrics with the values specified.
+   *
+   * @param executorMetrics map of executor metric name to value
+   */
+  private[spark] def this(executorMetrics: Map[String, Long]) {
+    this()
+    (0 until ExecutorMetricType.values.length).foreach { idx =>
+      metrics(idx) = executorMetrics.getOrElse(ExecutorMetricType.values(idx).name, 0L)
+    }
+  }
+
+  /**
+   * Compare the specified executor metrics values with the current executor metric values,
+   * and update the value for any metrics where the new value for the metric is larger.
+   *
+   * @param executorMetrics the executor metrics to compare
+   * @return if there is a new peak value for any metric
+   */
+  private[spark] def compareAndUpdatePeakValues(executorMetrics: ExecutorMetrics): Boolean = {
+    var updated = false
+
+    (0 until ExecutorMetricType.values.length).foreach { idx =>
+       if (executorMetrics.metrics(idx) > metrics(idx)) {
+        updated = true
+        metrics(idx) = executorMetrics.metrics(idx)
+      }
+    }
+    updated
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 319e664..ee41bd1 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -69,6 +69,11 @@ package object config {
     .bytesConf(ByteUnit.KiB)
     .createWithDefaultString("100k")
 
+  private[spark] val EVENT_LOG_STAGE_EXECUTOR_METRICS =
+    ConfigBuilder("spark.eventLog.logStageExecutorMetrics.enabled")
+      .booleanConf
+      .createWithDefault(false)
+
   private[spark] val EVENT_LOG_OVERWRITE =
     ConfigBuilder("spark.eventLog.overwrite").booleanConf.createWithDefault(false)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 0641adc..4fde2d0 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -181,6 +181,34 @@ private[spark] abstract class MemoryManager(
   }
 
   /**
+   *  On heap execution memory currently in use, in bytes.
+   */
+  final def onHeapExecutionMemoryUsed: Long = synchronized {
+    onHeapExecutionMemoryPool.memoryUsed
+  }
+
+  /**
+   *  Off heap execution memory currently in use, in bytes.
+   */
+  final def offHeapExecutionMemoryUsed: Long = synchronized {
+    offHeapExecutionMemoryPool.memoryUsed
+  }
+
+  /**
+   *  On heap storage memory currently in use, in bytes.
+   */
+  final def onHeapStorageMemoryUsed: Long = synchronized {
+    onHeapStorageMemoryPool.memoryUsed
+  }
+
+  /**
+   *  Off heap storage memory currently in use, in bytes.
+   */
+  final def offHeapStorageMemoryUsed: Long = synchronized {
+    offHeapStorageMemoryPool.memoryUsed
+  }
+
+  /**
    * Returns the execution memory consumption, in bytes, for the given task.
    */
   private[memory] def getExecutionMemoryUsageForTask(taskAttemptId: Long): Long = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala
new file mode 100644
index 0000000..cd10dad
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/metrics/ExecutorMetricType.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.metrics
+
+import java.lang.management.{BufferPoolMXBean, ManagementFactory}
+import javax.management.ObjectName
+
+import org.apache.spark.memory.MemoryManager
+
+/**
+ * Executor metric types for executor-level metrics stored in ExecutorMetrics.
+ */
+sealed trait ExecutorMetricType {
+  private[spark] def getMetricValue(memoryManager: MemoryManager): Long
+  private[spark] val name = getClass().getName().stripSuffix("$").split("""\.""").last
+}
+
+private[spark] abstract class MemoryManagerExecutorMetricType(
+    f: MemoryManager => Long) extends ExecutorMetricType {
+  override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
+    f(memoryManager)
+  }
+}
+
+private[spark] abstract class MBeanExecutorMetricType(mBeanName: String)
+  extends ExecutorMetricType {
+  private val bean = ManagementFactory.newPlatformMXBeanProxy(
+    ManagementFactory.getPlatformMBeanServer,
+    new ObjectName(mBeanName).toString, classOf[BufferPoolMXBean])
+
+  override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
+    bean.getMemoryUsed
+  }
+}
+
+case object JVMHeapMemory extends ExecutorMetricType {
+  override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
+    ManagementFactory.getMemoryMXBean.getHeapMemoryUsage().getUsed()
+  }
+}
+
+case object JVMOffHeapMemory extends ExecutorMetricType {
+  override private[spark] def getMetricValue(memoryManager: MemoryManager): Long = {
+    ManagementFactory.getMemoryMXBean.getNonHeapMemoryUsage().getUsed()
+  }
+}
+
+case object OnHeapExecutionMemory extends MemoryManagerExecutorMetricType(
+  _.onHeapExecutionMemoryUsed)
+
+case object OffHeapExecutionMemory extends MemoryManagerExecutorMetricType(
+  _.offHeapExecutionMemoryUsed)
+
+case object OnHeapStorageMemory extends MemoryManagerExecutorMetricType(
+  _.onHeapStorageMemoryUsed)
+
+case object OffHeapStorageMemory extends MemoryManagerExecutorMetricType(
+  _.offHeapStorageMemoryUsed)
+
+case object OnHeapUnifiedMemory extends MemoryManagerExecutorMetricType(
+  (m => m.onHeapExecutionMemoryUsed + m.onHeapStorageMemoryUsed))
+
+case object OffHeapUnifiedMemory extends MemoryManagerExecutorMetricType(
+  (m => m.offHeapExecutionMemoryUsed + m.offHeapStorageMemoryUsed))
+
+case object DirectPoolMemory extends MBeanExecutorMetricType(
+  "java.nio:type=BufferPool,name=direct")
+
+case object MappedPoolMemory extends MBeanExecutorMetricType(
+  "java.nio:type=BufferPool,name=mapped")
+
+private[spark] object ExecutorMetricType {
+  // List of all executor metric types
+  val values = IndexedSeq(
+    JVMHeapMemory,
+    JVMOffHeapMemory,
+    OnHeapExecutionMemory,
+    OffHeapExecutionMemory,
+    OnHeapStorageMemory,
+    OffHeapStorageMemory,
+    OnHeapUnifiedMemory,
+    OffHeapUnifiedMemory,
+    DirectPoolMemory,
+    MappedPoolMemory
+  )
+
+  // Map of executor metric type to its index in values.
+  val metricIdxMap =
+    Map[ExecutorMetricType, Int](ExecutorMetricType.values.zipWithIndex: _*)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 50c91da..4710835 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -35,7 +35,7 @@ import org.apache.commons.lang3.SerializationUtils
 
 import org.apache.spark._
 import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.network.util.JavaUtils
@@ -264,8 +264,11 @@ private[spark] class DAGScheduler(
       execId: String,
       // (taskId, stageId, stageAttemptId, accumUpdates)
       accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
-      blockManagerId: BlockManagerId): Boolean = {
-    listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates))
+      blockManagerId: BlockManagerId,
+      // executor metrics indexed by MetricGetter.values
+      executorUpdates: ExecutorMetrics): Boolean = {
+    listenerBus.post(SparkListenerExecutorMetricsUpdate(execId, accumUpdates,
+      Some(executorUpdates)))
     blockManagerMaster.driverEndpoint.askSync[Boolean](
       BlockManagerHeartbeat(blockManagerId), new RpcTimeout(600 seconds, "BlockManagerHeartbeat"))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 69bc51c..1629e17 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -23,8 +23,7 @@ import java.nio.charset.StandardCharsets
 import java.util.EnumSet
 import java.util.Locale
 
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, Map}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, FSDataOutputStream, Path}
@@ -36,6 +35,7 @@ import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.{SPARK_VERSION, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.io.CompressionCodec
@@ -51,6 +51,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
  *   spark.eventLog.overwrite - Whether to overwrite any existing files.
  *   spark.eventLog.dir - Path to the directory in which events are logged.
  *   spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
+ *   spark.eventLog.logStageExecutorMetrics.enabled - Whether to log stage executor metrics
  */
 private[spark] class EventLoggingListener(
     appId: String,
@@ -69,6 +70,7 @@ private[spark] class EventLoggingListener(
   private val shouldCompress = sparkConf.get(EVENT_LOG_COMPRESS)
   private val shouldOverwrite = sparkConf.get(EVENT_LOG_OVERWRITE)
   private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES)
+  private val shouldLogStageExecutorMetrics = sparkConf.get(EVENT_LOG_STAGE_EXECUTOR_METRICS)
   private val testing = sparkConf.get(EVENT_LOG_TESTING)
   private val outputBufferSize = sparkConf.get(EVENT_LOG_OUTPUT_BUFFER_SIZE).toInt
   private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
@@ -93,6 +95,9 @@ private[spark] class EventLoggingListener(
   // Visible for tests only.
   private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)
 
+  // map of (stageId, stageAttempt), to peak executor metrics for the stage
+  private val liveStageExecutorMetrics = Map.empty[(Int, Int), Map[String, ExecutorMetrics]]
+
   /**
    * Creates the log file in the configured log directory.
    */
@@ -155,7 +160,14 @@ private[spark] class EventLoggingListener(
   }
 
   // Events that do not trigger a flush
-  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = logEvent(event)
+  override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
+    logEvent(event)
+    if (shouldLogStageExecutorMetrics) {
+      // record the peak metrics for the new stage
+      liveStageExecutorMetrics.put((event.stageInfo.stageId, event.stageInfo.attemptNumber()),
+        Map.empty[String, ExecutorMetrics])
+    }
+  }
 
   override def onTaskStart(event: SparkListenerTaskStart): Unit = logEvent(event)
 
@@ -169,6 +181,26 @@ private[spark] class EventLoggingListener(
 
   // Events that trigger a flush
   override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
+    if (shouldLogStageExecutorMetrics) {
+      // clear out any previous attempts, that did not have a stage completed event
+      val prevAttemptId = event.stageInfo.attemptNumber() - 1
+      for (attemptId <- 0 to prevAttemptId) {
+        liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId))
+      }
+
+      // log the peak executor metrics for the stage, for each live executor,
+      // whether or not the executor is running tasks for the stage
+      val executorOpt = liveStageExecutorMetrics.remove(
+        (event.stageInfo.stageId, event.stageInfo.attemptNumber()))
+      executorOpt.foreach { execMap =>
+        execMap.foreach { case (executorId, peakExecutorMetrics) =>
+            logEvent(new SparkListenerStageExecutorMetrics(executorId, event.stageInfo.stageId,
+              event.stageInfo.attemptNumber(), peakExecutorMetrics))
+        }
+      }
+    }
+
+    // log stage completed event
     logEvent(event, flushLogger = true)
   }
 
@@ -234,8 +266,18 @@ private[spark] class EventLoggingListener(
     }
   }
 
-  // No-op because logging every update would be overkill
-  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
+  override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = {
+    if (shouldLogStageExecutorMetrics) {
+      // For the active stages, record any new peak values for the memory metrics for the executor
+      event.executorUpdates.foreach { executorUpdates =>
+        liveStageExecutorMetrics.values.foreach { peakExecutorMetrics =>
+          val peakMetrics = peakExecutorMetrics.getOrElseUpdate(
+            event.execId, new ExecutorMetrics())
+          peakMetrics.compareAndUpdatePeakValues(executorUpdates)
+        }
+      }
+    }
+  }
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
     if (event.logEvent) {
@@ -296,7 +338,7 @@ private[spark] object EventLoggingListener extends Logging {
   private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
 
   // A cache for compression codecs to avoid creating the same codec many times
-  private val codecMap = new mutable.HashMap[String, CompressionCodec]
+  private val codecMap = Map.empty[String, CompressionCodec]
 
   /**
    * Write metadata about an event log to the given stream.

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 8a112f6..293e836 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -26,7 +26,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo
 
 import org.apache.spark.{SparkConf, TaskEndReason}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
 import org.apache.spark.ui.SparkUI
@@ -160,11 +160,29 @@ case class SparkListenerBlockUpdated(blockUpdatedInfo: BlockUpdatedInfo) extends
  * Periodic updates from executors.
  * @param execId executor id
  * @param accumUpdates sequence of (taskId, stageId, stageAttemptId, accumUpdates)
+ * @param executorUpdates executor level metrics updates
  */
 @DeveloperApi
 case class SparkListenerExecutorMetricsUpdate(
     execId: String,
-    accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])])
+    accumUpdates: Seq[(Long, Int, Int, Seq[AccumulableInfo])],
+    executorUpdates: Option[ExecutorMetrics] = None)
+  extends SparkListenerEvent
+
+/**
+ * Peak metric values for the executor for the stage, written to the history log at stage
+ * completion.
+ * @param execId executor id
+ * @param stageId stage id
+ * @param stageAttemptId stage attempt
+ * @param executorMetrics executor level metrics, indexed by MetricGetter.values
+ */
+@DeveloperApi
+case class SparkListenerStageExecutorMetrics(
+    execId: String,
+    stageId: Int,
+    stageAttemptId: Int,
+    executorMetrics: ExecutorMetrics)
   extends SparkListenerEvent
 
 @DeveloperApi
@@ -265,6 +283,13 @@ private[spark] trait SparkListenerInterface {
   def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit
 
   /**
+   * Called with the peak memory metrics for a given (executor, stage) combination. Note that this
+   * is only present when reading from the event log (as in the history server), and is never
+   * called in a live application.
+   */
+  def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit
+
+  /**
    * Called when the driver registers a new executor.
    */
   def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit
@@ -361,6 +386,9 @@ abstract class SparkListener extends SparkListenerInterface {
   override def onExecutorMetricsUpdate(
       executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { }
 
+  override def onStageExecutorMetrics(
+      executorMetrics: SparkListenerStageExecutorMetrics): Unit = { }
+
   override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { }
 
   override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { }

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index ff19cc6..8f6b7ad 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -57,6 +57,8 @@ private[spark] trait SparkListenerBus
         listener.onApplicationEnd(applicationEnd)
       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
         listener.onExecutorMetricsUpdate(metricsUpdate)
+      case stageExecutorMetrics: SparkListenerStageExecutorMetrics =>
+        listener.onStageExecutorMetrics(stageExecutorMetrics)
       case executorAdded: SparkListenerExecutorAdded =>
         listener.onExecutorAdded(executorAdded)
       case executorRemoved: SparkListenerExecutorRemoved =>

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 95f7ae4..94221eb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.scheduler
 
+import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.AccumulatorV2
@@ -74,14 +75,15 @@ private[spark] trait TaskScheduler {
   def defaultParallelism(): Int
 
   /**
-   * Update metrics for in-progress tasks and let the master know that the BlockManager is still
-   * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
-   * indicating that the block manager should re-register.
+   * Update metrics for in-progress tasks and executor metrics, and let the master know that the
+   * BlockManager is still alive. Return true if the driver knows about the given block manager.
+   * Otherwise, return false, indicating that the block manager should re-register.
    */
   def executorHeartbeatReceived(
       execId: String,
       accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
-      blockManagerId: BlockManagerId): Boolean
+      blockManagerId: BlockManagerId,
+      executorUpdates: ExecutorMetrics): Boolean
 
   /**
    * Get an application ID associated with the job.

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 8b71170..4f870e85 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -28,6 +28,7 @@ import scala.util.Random
 
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
+import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.rpc.RpcEndpoint
@@ -508,14 +509,15 @@ private[spark] class TaskSchedulerImpl(
   }
 
   /**
-   * Update metrics for in-progress tasks and let the master know that the BlockManager is still
-   * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
-   * indicating that the block manager should re-register.
+   * Update metrics for in-progress tasks and executor metrics, and let the master know that the
+   * BlockManager is still alive. Return true if the driver knows about the given block manager.
+   * Otherwise, return false, indicating that the block manager should re-register.
    */
   override def executorHeartbeatReceived(
       execId: String,
       accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
-      blockManagerId: BlockManagerId): Boolean = {
+      blockManagerId: BlockManagerId,
+      executorMetrics: ExecutorMetrics): Boolean = {
     // (taskId, stageId, stageAttemptId, accumUpdates)
     val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = {
       accumUpdates.flatMap { case (id, updates) =>
@@ -525,7 +527,8 @@ private[spark] class TaskSchedulerImpl(
         }
       }
     }
-    dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId)
+    dagScheduler.executorHeartbeatReceived(execId, accumUpdatesWithTaskIds, blockManagerId,
+      executorMetrics)
   }
 
   def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index 91b75e4..304d092 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 
 import org.apache.spark._
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
 import org.apache.spark.status.api.v1
@@ -66,6 +66,7 @@ private[spark] class AppStatusListener(
   private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
   private val liveJobs = new HashMap[Int, LiveJob]()
   private val liveExecutors = new HashMap[String, LiveExecutor]()
+  private val deadExecutors = new HashMap[String, LiveExecutor]()
   private val liveTasks = new HashMap[Long, LiveTask]()
   private val liveRDDs = new HashMap[Int, LiveRDD]()
   private val pools = new HashMap[String, SchedulerPool]()
@@ -204,6 +205,19 @@ private[spark] class AppStatusListener(
           update(rdd, now)
         }
       }
+      if (isExecutorActiveForLiveStages(exec)) {
+        // the executor was running for a currently active stage, so save it for now in
+        // deadExecutors, and remove when there are no active stages overlapping with the
+        // executor.
+        deadExecutors.put(event.executorId, exec)
+      }
+    }
+  }
+
+  /** Was the specified executor active for any currently live stages? */
+  private def isExecutorActiveForLiveStages(exec: LiveExecutor): Boolean = {
+    liveStages.values.asScala.exists { stage =>
+      stage.info.submissionTime.getOrElse(0L) < exec.removeTime.getTime
     }
   }
 
@@ -641,6 +655,9 @@ private[spark] class AppStatusListener(
       }
     }
 
+    // remove any dead executors that were not running for any currently active stages
+    deadExecutors.retain((execId, exec) => isExecutorActiveForLiveStages(exec))
+
     appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
     kvstore.write(appSummary)
   }
@@ -692,6 +709,31 @@ private[spark] class AppStatusListener(
         }
       }
     }
+
+    // check if there is a new peak value for any of the executor level memory metrics
+    // for the live UI. SparkListenerExecutorMetricsUpdate events are only processed
+    // for the live UI.
+    event.executorUpdates.foreach { updates =>
+      liveExecutors.get(event.execId).foreach { exec =>
+        if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(updates)) {
+          maybeUpdate(exec, now)
+        }
+      }
+    }
+  }
+
+  override def onStageExecutorMetrics(executorMetrics: SparkListenerStageExecutorMetrics): Unit = {
+    val now = System.nanoTime()
+
+    // check if there is a new peak value for any of the executor level memory metrics,
+    // while reading from the log. SparkListenerStageExecutorMetrics are only processed
+    // when reading logs.
+    liveExecutors.get(executorMetrics.execId)
+      .orElse(deadExecutors.get(executorMetrics.execId)).map { exec =>
+      if (exec.peakExecutorMetrics.compareAndUpdatePeakValues(executorMetrics.executorMetrics)) {
+        update(exec, now)
+      }
+    }
   }
 
   override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index 79e3f13..a0b2458 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.HashMap
 import com.google.common.collect.Interners
 
 import org.apache.spark.JobExecutionStatus
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
 import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage.RDDInfo
@@ -268,6 +268,9 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
 
   def hasMemoryInfo: Boolean = totalOnHeap >= 0L
 
+  // peak values for executor level metrics
+  val peakExecutorMetrics = new ExecutorMetrics()
+
   def hostname: String = if (host != null) host else hostPort.split(":")(0)
 
   override protected def doUpdate(): Any = {
@@ -302,10 +305,10 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
       Option(removeReason),
       executorLogs,
       memoryMetrics,
-      blacklistedInStages)
+      blacklistedInStages,
+      Some(peakExecutorMetrics).filter(_.isSet))
     new ExecutorSummaryWrapper(info)
   }
-
 }
 
 private class LiveExecutorStageSummary(

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 971d7e9..77466b6 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -22,9 +22,14 @@ import java.util.Date
 import scala.xml.{NodeSeq, Text}
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties
-import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import com.fasterxml.jackson.core.{JsonGenerator, JsonParser}
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.{DeserializationContext, JsonDeserializer, JsonSerializer, SerializerProvider}
+import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize}
 
 import org.apache.spark.JobExecutionStatus
+import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.metrics.ExecutorMetricType
 
 case class ApplicationInfo private[spark](
     id: String,
@@ -98,7 +103,10 @@ class ExecutorSummary private[spark](
     val removeReason: Option[String],
     val executorLogs: Map[String, String],
     val memoryMetrics: Option[MemoryMetrics],
-    val blacklistedInStages: Set[Int])
+    val blacklistedInStages: Set[Int],
+    @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
+    @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
+    val peakMemoryMetrics: Option[ExecutorMetrics])
 
 class MemoryMetrics private[spark](
     val usedOnHeapStorageMemory: Long,
@@ -106,6 +114,33 @@ class MemoryMetrics private[spark](
     val totalOnHeapStorageMemory: Long,
     val totalOffHeapStorageMemory: Long)
 
+/** deserializer for peakMemoryMetrics: convert map to ExecutorMetrics */
+private[spark] class ExecutorMetricsJsonDeserializer
+    extends JsonDeserializer[Option[ExecutorMetrics]] {
+  override def deserialize(
+      jsonParser: JsonParser,
+      deserializationContext: DeserializationContext): Option[ExecutorMetrics] = {
+    val metricsMap = jsonParser.readValueAs[Option[Map[String, Long]]](
+      new TypeReference[Option[Map[String, java.lang.Long]]] {})
+    metricsMap.map(metrics => new ExecutorMetrics(metrics))
+  }
+}
+/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */
+private[spark] class ExecutorMetricsJsonSerializer
+    extends JsonSerializer[Option[ExecutorMetrics]] {
+  override def serialize(
+      metrics: Option[ExecutorMetrics],
+      jsonGenerator: JsonGenerator,
+      serializerProvider: SerializerProvider): Unit = {
+    metrics.foreach { m: ExecutorMetrics =>
+      val metricsMap = ExecutorMetricType.values.map { metricType =>
+            metricType.name -> m.getMetricValue(metricType)
+      }.toMap
+      jsonGenerator.writeObject(metricsMap)
+    }
+  }
+}
+
 class JobData private[spark](
     val jobId: Int,
     val name: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 50c6461..0cd8612 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -31,6 +31,7 @@ import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark._
 import org.apache.spark.executor._
+import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.rdd.RDDOperationScope
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -98,6 +99,8 @@ private[spark] object JsonProtocol {
         logStartToJson(logStart)
       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
         executorMetricsUpdateToJson(metricsUpdate)
+      case stageExecutorMetrics: SparkListenerStageExecutorMetrics =>
+        stageExecutorMetricsToJson(stageExecutorMetrics)
       case blockUpdate: SparkListenerBlockUpdated =>
         blockUpdateToJson(blockUpdate)
       case _ => parse(mapper.writeValueAsString(event))
@@ -236,6 +239,7 @@ private[spark] object JsonProtocol {
   def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
     val execId = metricsUpdate.execId
     val accumUpdates = metricsUpdate.accumUpdates
+    val executorMetrics = metricsUpdate.executorUpdates.map(executorMetricsToJson(_))
     ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~
     ("Executor ID" -> execId) ~
     ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
@@ -243,7 +247,16 @@ private[spark] object JsonProtocol {
       ("Stage ID" -> stageId) ~
       ("Stage Attempt ID" -> stageAttemptId) ~
       ("Accumulator Updates" -> JArray(updates.map(accumulableInfoToJson).toList))
-    })
+    }) ~
+    ("Executor Metrics Updated" -> executorMetrics)
+  }
+
+  def stageExecutorMetricsToJson(metrics: SparkListenerStageExecutorMetrics): JValue = {
+    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageExecutorMetrics) ~
+    ("Executor ID" -> metrics.execId) ~
+    ("Stage ID" -> metrics.stageId) ~
+    ("Stage Attempt ID" -> metrics.stageAttemptId) ~
+    ("Executor Metrics" -> executorMetricsToJson(metrics.executorMetrics))
   }
 
   def blockUpdateToJson(blockUpdate: SparkListenerBlockUpdated): JValue = {
@@ -379,6 +392,14 @@ private[spark] object JsonProtocol {
     ("Updated Blocks" -> updatedBlocks)
   }
 
+  /** Convert executor metrics to JSON. */
+  def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = {
+    val metrics = ExecutorMetricType.values.map{ metricType =>
+      JField(metricType.name, executorMetrics.getMetricValue(metricType))
+     }
+    JObject(metrics: _*)
+  }
+
   def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
     val reason = Utils.getFormattedClassName(taskEndReason)
     val json: JObject = taskEndReason match {
@@ -531,6 +552,7 @@ private[spark] object JsonProtocol {
     val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
     val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
     val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
+    val stageExecutorMetrics = Utils.getFormattedClassName(SparkListenerStageExecutorMetrics)
     val blockUpdate = Utils.getFormattedClassName(SparkListenerBlockUpdated)
   }
 
@@ -555,6 +577,7 @@ private[spark] object JsonProtocol {
       case `executorRemoved` => executorRemovedFromJson(json)
       case `logStart` => logStartFromJson(json)
       case `metricsUpdate` => executorMetricsUpdateFromJson(json)
+      case `stageExecutorMetrics` => stageExecutorMetricsFromJson(json)
       case `blockUpdate` => blockUpdateFromJson(json)
       case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
         .asInstanceOf[SparkListenerEvent]
@@ -585,6 +608,15 @@ private[spark] object JsonProtocol {
     SparkListenerTaskGettingResult(taskInfo)
   }
 
+  /** Extract the executor metrics from JSON. */
+  def executorMetricsFromJson(json: JValue): ExecutorMetrics = {
+    val metrics =
+      ExecutorMetricType.values.map { metric =>
+        metric.name -> jsonOption(json \ metric.name).map(_.extract[Long]).getOrElse(0L)
+      }.toMap
+    new ExecutorMetrics(metrics)
+  }
+
   def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
     val stageId = (json \ "Stage ID").extract[Int]
     val stageAttemptId =
@@ -691,7 +723,18 @@ private[spark] object JsonProtocol {
         (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson)
       (taskId, stageId, stageAttemptId, updates)
     }
-    SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates)
+    val executorUpdates = jsonOption(json \ "Executor Metrics Updated").map {
+      executorUpdate => executorMetricsFromJson(executorUpdate)
+    }
+    SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates, executorUpdates)
+  }
+
+  def stageExecutorMetricsFromJson(json: JValue): SparkListenerStageExecutorMetrics = {
+    val execId = (json \ "Executor ID").extract[String]
+    val stageId = (json \ "Stage ID").extract[Int]
+    val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
+    val executorMetrics = executorMetricsFromJson(json \ "Executor Metrics")
+    SparkListenerStageExecutorMetrics(execId, stageId, stageAttemptId, executorMetrics)
   }
 
   def blockUpdateFromJson(json: JValue): SparkListenerBlockUpdated = {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
index 4fecf84..eea6f59 100644
--- a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1506645932520_24630151",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2018-04-19T23:54:42.734GMT",
+    "endTime" : "2018-04-19T23:56:29.134GMT",
+    "lastUpdated" : "",
+    "duration" : 106400,
+    "sparkUser" : "edlu",
+    "completed" : true,
+    "appSparkVersion" : "2.4.0-SNAPSHOT",
+    "lastUpdatedEpoch" : 0,
+    "startTimeEpoch" : 1524182082734,
+    "endTimeEpoch" : 1524182189134
+  } ]
+}, {
   "id" : "application_1516285256255_0012",
   "name" : "Spark shell",
   "attempts" : [ {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
index 4fecf84..7bc7f31 100644
--- a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1506645932520_24630151",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2018-04-19T23:54:42.734GMT",
+    "endTime" : "2018-04-19T23:56:29.134GMT",
+    "lastUpdated" : "",
+    "duration" : 106400,
+    "sparkUser" : "edlu",
+    "completed" : true,
+    "appSparkVersion" : "2.4.0-SNAPSHOT",
+    "startTimeEpoch" : 1524182082734,
+    "endTimeEpoch" : 1524182189134,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "application_1516285256255_0012",
   "name" : "Spark shell",
   "attempts" : [ {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
new file mode 100644
index 0000000..9bf2086
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/executor_list_with_executor_metrics_json_expectation.json
@@ -0,0 +1,314 @@
+[ {
+  "id" : "driver",
+  "hostPort" : "node0033.grid.company.com:60749",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 0,
+  "maxTasks" : 0,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 0,
+  "totalTasks" : 0,
+  "totalDuration" : 0,
+  "totalGCTime" : 0,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : false,
+  "maxMemory" : 1043437977,
+  "addTime" : "2018-04-19T23:55:05.107GMT",
+  "executorLogs" : { },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 1043437977,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ],
+  "peakMemoryMetrics" : {
+    "OnHeapStorageMemory" : 905801,
+    "JVMOffHeapMemory" : 205304696,
+    "OffHeapExecutionMemory" : 0,
+    "OnHeapUnifiedMemory" : 905801,
+    "OnHeapExecutionMemory" : 0,
+    "OffHeapUnifiedMemory" : 0,
+    "DirectPoolMemory" : 397602,
+    "MappedPoolMemory" : 0,
+    "JVMHeapMemory" : 629553808,
+    "OffHeapStorageMemory" : 0
+  }
+}, {
+  "id" : "7",
+  "hostPort" : "node6340.grid.company.com:5933",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 0,
+  "totalTasks" : 0,
+  "totalDuration" : 0,
+  "totalGCTime" : 0,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:49.826GMT",
+  "executorLogs" : {
+    "stdout" : "http://node6340.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000009/edlu/stdout?start=-4096",
+    "stderr" : "http://node6340.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000009/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ]
+}, {
+  "id" : "6",
+  "hostPort" : "node6644.grid.company.com:8445",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 0,
+  "totalTasks" : 0,
+  "totalDuration" : 0,
+  "totalGCTime" : 0,
+  "totalInputBytes" : 0,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 0,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:47.549GMT",
+  "executorLogs" : {
+    "stdout" : "http://node6644.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000008/edlu/stdout?start=-4096",
+    "stderr" : "http://node6644.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000008/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ]
+}, {
+  "id" : "5",
+  "hostPort" : "node2477.grid.company.com:20123",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 1,
+  "totalTasks" : 1,
+  "totalDuration" : 9252,
+  "totalGCTime" : 920,
+  "totalInputBytes" : 36838295,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 355051,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:43.160GMT",
+  "executorLogs" : {
+    "stdout" : "http://node2477.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000007/edlu/stdout?start=-4096",
+    "stderr" : "http://node2477.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000007/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ]
+}, {
+  "id" : "4",
+  "hostPort" : "node4243.grid.company.com:16084",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 3,
+  "totalTasks" : 3,
+  "totalDuration" : 15645,
+  "totalGCTime" : 405,
+  "totalInputBytes" : 87272855,
+  "totalShuffleRead" : 438675,
+  "totalShuffleWrite" : 26773039,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:12.278GMT",
+  "executorLogs" : {
+    "stdout" : "http://node4243.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000006/edlu/stdout?start=-4096",
+    "stderr" : "http://node4243.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000006/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ],
+  "peakMemoryMetrics" : {
+    "OnHeapStorageMemory" : 63104457,
+    "JVMOffHeapMemory" : 95657456,
+    "OffHeapExecutionMemory" : 0,
+    "OnHeapUnifiedMemory" : 100853193,
+    "OnHeapExecutionMemory" : 37748736,
+    "OffHeapUnifiedMemory" : 0,
+    "DirectPoolMemory" : 126261,
+    "MappedPoolMemory" : 0,
+    "JVMHeapMemory" : 518613056,
+    "OffHeapStorageMemory" : 0
+  }
+}, {
+  "id" : "3",
+  "hostPort" : "node0998.grid.company.com:45265",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 1,
+  "totalTasks" : 1,
+  "totalDuration" : 14491,
+  "totalGCTime" : 342,
+  "totalInputBytes" : 50409514,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 31362123,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:12.088GMT",
+  "executorLogs" : {
+    "stdout" : "http://node0998.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000005/edlu/stdout?start=-4096",
+    "stderr" : "http://node0998.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000005/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ],
+  "peakMemoryMetrics" : {
+    "OnHeapStorageMemory" : 69535048,
+    "JVMOffHeapMemory" : 90709624,
+    "OffHeapExecutionMemory" : 0,
+    "OnHeapUnifiedMemory" : 69535048,
+    "OnHeapExecutionMemory" : 0,
+    "OffHeapUnifiedMemory" : 0,
+    "DirectPoolMemory" : 87796,
+    "MappedPoolMemory" : 0,
+    "JVMHeapMemory" : 726805712,
+    "OffHeapStorageMemory" : 0
+  }
+}, {
+  "id" : "2",
+  "hostPort" : "node4045.grid.company.com:29262",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 1,
+  "totalTasks" : 1,
+  "totalDuration" : 14113,
+  "totalGCTime" : 326,
+  "totalInputBytes" : 50423423,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 22950296,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:12.471GMT",
+  "executorLogs" : {
+    "stdout" : "http://node4045.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000004/edlu/stdout?start=-4096",
+    "stderr" : "http://node4045.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000004/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ],
+  "peakMemoryMetrics" : {
+    "OnHeapStorageMemory" : 58468944,
+    "JVMOffHeapMemory" : 91208368,
+    "OffHeapExecutionMemory" : 0,
+    "OnHeapUnifiedMemory" : 58468944,
+    "OnHeapExecutionMemory" : 0,
+    "OffHeapUnifiedMemory" : 0,
+    "DirectPoolMemory" : 87796,
+    "MappedPoolMemory" : 0,
+    "JVMHeapMemory" : 595946552,
+    "OffHeapStorageMemory" : 0
+  }
+}, {
+  "id" : "1",
+  "hostPort" : "node1404.grid.company.com:34043",
+  "isActive" : true,
+  "rddBlocks" : 0,
+  "memoryUsed" : 0,
+  "diskUsed" : 0,
+  "totalCores" : 1,
+  "maxTasks" : 1,
+  "activeTasks" : 0,
+  "failedTasks" : 0,
+  "completedTasks" : 3,
+  "totalTasks" : 3,
+  "totalDuration" : 15665,
+  "totalGCTime" : 471,
+  "totalInputBytes" : 98905018,
+  "totalShuffleRead" : 0,
+  "totalShuffleWrite" : 20594744,
+  "isBlacklisted" : false,
+  "maxMemory" : 956615884,
+  "addTime" : "2018-04-19T23:55:11.695GMT",
+  "executorLogs" : {
+    "stdout" : "http://node1404.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000002/edlu/stdout?start=-4096",
+    "stderr" : "http://node1404.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000002/edlu/stderr?start=-4096"
+  },
+  "memoryMetrics" : {
+    "usedOnHeapStorageMemory" : 0,
+    "usedOffHeapStorageMemory" : 0,
+    "totalOnHeapStorageMemory" : 956615884,
+    "totalOffHeapStorageMemory" : 0
+  },
+  "blacklistedInStages" : [ ],
+  "peakMemoryMetrics" : {
+    "OnHeapStorageMemory" : 47962185,
+    "JVMOffHeapMemory" : 100519936,
+    "OffHeapExecutionMemory" : 0,
+    "OnHeapUnifiedMemory" : 47962185,
+    "OnHeapExecutionMemory" : 0,
+    "OffHeapUnifiedMemory" : 0,
+    "DirectPoolMemory" : 98230,
+    "MappedPoolMemory" : 0,
+    "JVMHeapMemory" : 755008624,
+    "OffHeapStorageMemory" : 0
+  }
+} ]

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
index 79950b0..9e1e65a 100644
--- a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1506645932520_24630151",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2018-04-19T23:54:42.734GMT",
+    "endTime" : "2018-04-19T23:56:29.134GMT",
+    "lastUpdated" : "",
+    "duration" : 106400,
+    "sparkUser" : "edlu",
+    "completed" : true,
+    "appSparkVersion" : "2.4.0-SNAPSHOT",
+    "lastUpdatedEpoch" : 0,
+    "startTimeEpoch" : 1524182082734,
+    "endTimeEpoch" : 1524182189134
+  } ]
+}, {
   "id" : "application_1516285256255_0012",
   "name" : "Spark shell",
   "attempts" : [ {
@@ -28,19 +43,4 @@
     "startTimeEpoch" : 1515492942372,
     "endTimeEpoch" : 1515493477606
   } ]
-}, {
-  "id" : "app-20161116163331-0000",
-  "name" : "Spark shell",
-  "attempts" : [ {
-    "startTime" : "2016-11-16T22:33:29.916GMT",
-    "endTime" : "2016-11-16T22:33:40.587GMT",
-    "lastUpdated" : "",
-    "duration" : 10671,
-    "sparkUser" : "jose",
-    "completed" : true,
-    "appSparkVersion" : "2.1.0-SNAPSHOT",
-    "lastUpdatedEpoch" : 0,
-    "startTimeEpoch" : 1479335609916,
-    "endTimeEpoch" : 1479335620587
-  } ]
 } ]

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
index 7d60977..28c6bf1 100644
--- a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1506645932520_24630151",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2018-04-19T23:54:42.734GMT",
+    "endTime" : "2018-04-19T23:56:29.134GMT",
+    "lastUpdated" : "",
+    "duration" : 106400,
+    "sparkUser" : "edlu",
+    "completed" : true,
+    "appSparkVersion" : "2.4.0-SNAPSHOT",
+    "lastUpdatedEpoch" : 0,
+    "startTimeEpoch" : 1524182082734,
+    "endTimeEpoch" : 1524182189134
+  } ]
+}, {
   "id" : "application_1516285256255_0012",
   "name" : "Spark shell",
   "attempts" : [ {

http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
----------------------------------------------------------------------
diff --git a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
index dfbfd8a..f547b79 100644
--- a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1506645932520_24630151",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2018-04-19T23:54:42.734GMT",
+    "endTime" : "2018-04-19T23:56:29.134GMT",
+    "lastUpdated" : "",
+    "duration" : 106400,
+    "sparkUser" : "edlu",
+    "completed" : true,
+    "appSparkVersion" : "2.4.0-SNAPSHOT",
+    "lastUpdatedEpoch" : 0,
+    "startTimeEpoch" : 1524182082734,
+    "endTimeEpoch" : 1524182189134
+  } ]
+}, {
   "id" : "application_1516285256255_0012",
   "name" : "Spark shell",
   "attempts" : [ {
@@ -101,4 +116,4 @@
     "startTimeEpoch" : 1430917380880,
     "endTimeEpoch" : 1430917380890
   } ]
-} ]
\ No newline at end of file
+} ]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/3] spark git commit: [SPARK-23429][CORE] Add executor memory metrics to heartbeat and expose in executors REST API

Posted by mc...@apache.org.
http://git-wip-us.apache.org/repos/asf/spark/blob/9241e1e7/core/src/test/resources/spark-events/application_1506645932520_24630151
----------------------------------------------------------------------
diff --git a/core/src/test/resources/spark-events/application_1506645932520_24630151 b/core/src/test/resources/spark-events/application_1506645932520_24630151
new file mode 100644
index 0000000..c48ed74
--- /dev/null
+++ b/core/src/test/resources/spark-events/application_1506645932520_24630151
@@ -0,0 +1,63 @@
+{"Event":"SparkListenerLogStart","Spark Version":"2.4.0-SNAPSHOT"}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"node0033.grid.company.com","Port":60749},"Maximum Memory":1043437977,"Timestamp":1524182105107,"Maximum Onheap Memory":1043437977,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/usr/java/jdk1.8.0_31/jre","Java Version":"1.8.0_31 (Oracle Corporation)","Scala Version":"version 2.11.8"},"Spark Properties":{"spark.jars.ivySettings":"/export/apps/spark/commonconf/ivysettings.xml","spark.serializer":"org.apache.spark.serializer.KryoSerializer","spark.driver.host":"node0033.grid.company.com","spark.dynamicAllocation.sustainedSchedulerBacklogTimeout":"5","spark.eventLog.enabled":"true","spark.ui.port":"0","spark.driver.port":"57705","spark.shuffle.service.enabled":"true","spark.ui.acls.enable":"true","spark.reducer.maxSizeInFlight":"48m","spark.yarn.queue":"spark_default","spark.repl.class.uri":"spark://node0033.grid.company.com:57705/classes","spark.jars":"","spark.yarn.historyServer.address":"clustersh01.grid.company.com:18080","spark.memoryOverhead.multiplier.percent":"10","spark.repl.class.outputDir":"/grid/a/mapred/tmp/spark-21b68b4b-c1db-460e-a228-b87545d870f1/repl-58778a76-04c1-434d-b
 fb7-9a9b83afe718","spark.dynamicAllocation.cachedExecutorIdleTimeout":"1200","spark.yarn.access.namenodes":"hdfs://clusternn02.grid.company.com:9000","spark.app.name":"Spark shell","spark.dynamicAllocation.schedulerBacklogTimeout":"5","spark.yarn.security.credentials.hive.enabled":"false","spark.yarn.am.cores":"1","spark.memoryOverhead.min":"384","spark.scheduler.mode":"FIFO","spark.driver.memory":"2G","spark.executor.instances":"4","spark.isolated.classloader.additional.classes.prefix":"com_company_","spark.logConf":"true","spark.ui.showConsoleProgress":"true","spark.user.priority.jars":"*********(redacted)","spark.isolated.classloader":"true","spark.sql.sources.schemaStringLengthThreshold":"40000","spark.yarn.secondary.jars":"spark-avro_2.11-3.2.0.21.jar,grid-topology-1.0.jar","spark.reducer.maxBlocksInFlightPerAddress":"100","spark.dynamicAllocation.maxExecutors":"900","spark.yarn.appMasterEnv.LD_LIBRARY_PATH":"/export/apps/hadoop/latest/lib/native","spark.executor.id":"driver","
 spark.yarn.am.memory":"2G","spark.driver.cores":"1","spark.search.packages":"com.company.dali:dali-data-spark,com.company.spark-common:spark-common","spark.min.mem.vore.ratio":"5","spark.sql.sources.partitionOverwriteMode":"DYNAMIC","spark.submit.deployMode":"client","spark.yarn.maxAppAttempts":"1","spark.master":"yarn","spark.default.packages":"com.company.dali:dali-data-spark:8.+?classifier=all,com.company.spark-common:spark-common_2.10:0.+?","spark.isolated.classloader.default.jar":"*dali-data-spark*","spark.authenticate":"true","spark.eventLog.usexattr":"true","spark.ui.filters":"org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter","spark.executor.memory":"2G","spark.home":"/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51","spark.reducer.maxReqsInFlight":"10","spark.eventLog.dir":"hdfs://clusternn02.grid.company.com:9000/system/spark-history","spark.dynamicAllocation.enabled":"true","spark.sql.catalogImplementation":"hive","spark.isolated.classes":"org.apache.hadoop.hi
 ve.ql.io.CombineHiveInputFormat$CombineHiveInputSplit","spark.eventLog.compress":"true","spark.executor.cores":"1","spark.version":"2.1.0","spark.driver.appUIAddress":"http://node0033.grid.company.com:8364","spark.repl.local.jars":"file:///export/home/edlu/spark-avro_2.11-3.2.0.21.jar,file:///export/apps/hadoop/site/lib/grid-topology-1.0.jar","spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS":"clusterwp01.grid.company.com","spark.min.memory-gb.size":"10","spark.dynamicAllocation.minExecutors":"1","spark.dynamicAllocation.initialExecutors":"3","spark.expressionencoder.org.apache.avro.specific.SpecificRecord":"com.databricks.spark.avro.AvroEncoder$","spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES":"http://clusterwp01.grid.company.com:8080/proxy/application_1506645932520_24630151","spark.executorEnv.LD_LIBRARY_PATH":"/export/apps/hadoop/latest/lib/native","spark.dynamicAllocation.executorIdleTimeout":"150","spark
 .shell.auto.node.labeling":"true","spark.yarn.dist.jars":"file:///export/home/edlu/spark-avro_2.11-3.2.0.21.jar,file:///export/apps/hadoop/site/lib/grid-topology-1.0.jar","spark.app.id":"application_1506645932520_24630151","spark.ui.view.acls":"*"},"System Properties":{"java.io.tmpdir":"/tmp","line.separator":"\n","path.separator":":","sun.management.compiler":"HotSpot 64-Bit Tiered Compilers","SPARK_SUBMIT":"true","sun.cpu.endian":"little","java.specification.version":"1.8","java.vm.specification.name":"Java Virtual Machine Specification","java.vendor":"Oracle Corporation","java.vm.specification.version":"1.8","user.home":"*********(redacted)","file.encoding.pkg":"sun.io","sun.nio.ch.bugLevel":"","sun.arch.data.model":"64","sun.boot.library.path":"/usr/java/jdk1.8.0_31/jre/lib/amd64","user.dir":"*********(redacted)","java.library.path":"/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib","sun.cpu.isalist":"","os.arch":"amd64","java.vm.version":"25.31-b07","java.endorsed.d
 irs":"/usr/java/jdk1.8.0_31/jre/lib/endorsed","java.runtime.version":"1.8.0_31-b13","java.vm.info":"mixed mode","java.ext.dirs":"/usr/java/jdk1.8.0_31/jre/lib/ext:/usr/java/packages/lib/ext","java.runtime.name":"Java(TM) SE Runtime Environment","file.separator":"/","java.class.version":"52.0","scala.usejavacp":"true","java.specification.name":"Java Platform API Specification","sun.boot.class.path":"/usr/java/jdk1.8.0_31/jre/lib/resources.jar:/usr/java/jdk1.8.0_31/jre/lib/rt.jar:/usr/java/jdk1.8.0_31/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_31/jre/lib/jsse.jar:/usr/java/jdk1.8.0_31/jre/lib/jce.jar:/usr/java/jdk1.8.0_31/jre/lib/charsets.jar:/usr/java/jdk1.8.0_31/jre/lib/jfr.jar:/usr/java/jdk1.8.0_31/jre/classes","file.encoding":"UTF-8","user.timezone":"*********(redacted)","java.specification.vendor":"Oracle Corporation","sun.java.launcher":"SUN_STANDARD","os.version":"2.6.32-504.16.2.el6.x86_64","sun.os.patch.level":"unknown","java.vm.specification.vendor":"Oracle Corporation","user
 .country":"*********(redacted)","sun.jnu.encoding":"UTF-8","user.language":"*********(redacted)","java.vendor.url":"*********(redacted)","java.awt.printerjob":"sun.print.PSPrinterJob","java.awt.graphicsenv":"sun.awt.X11GraphicsEnvironment","awt.toolkit":"sun.awt.X11.XToolkit","os.name":"Linux","java.vm.vendor":"Oracle Corporation","java.vendor.url.bug":"*********(redacted)","user.name":"*********(redacted)","java.vm.name":"Java HotSpot(TM) 64-Bit Server VM","sun.java.command":"org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode client --class org.apache.spark.repl.Main --name Spark shell --jars /export/home/edlu/spark-avro_2.11-3.2.0.21.jar,/export/apps/hadoop/site/lib/grid-topology-1.0.jar --num-executors 4 spark-shell","java.home":"/usr/java/jdk1.8.0_31/jre","java.version":"1.8.0_31","sun.io.unicode.encoding":"UnicodeLittle"},"Classpath Entries":{"/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/guice-servlet-3.0.jar":"System Classpath","/export/home/edlu/spark
 -2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jackson-mapper-asl-1.9.13.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/derby-10.12.1.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/htrace-core-3.0.4.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/api-asn1-api-1.0.0-M20.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/scala-reflect-2.11.8.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/datanucleus-rdbms-3.2.9.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-graphx_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/api-util-1.0.0-M20.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-yarn-client-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/base64-2.3.8.jar"
 :"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-auth-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/validation-api-1.1.0.Final.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hk2-utils-2.4.0-b34.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/zstd-jni-1.3.2-2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-yarn-api-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/objenesis-2.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/conf/":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/httpclient-4.5.4.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/kryo-shaded-3.0.3.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/scala-library-2.11.8.jar"
 :"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-net-3.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/xz-1.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/json4s-jackson_2.11-3.5.3.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/javax.servlet-api-3.1.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jersey-server-1.9.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jackson-annotations-2.6.7.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/parquet-hadoop-1.8.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/activation-1.1.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spire_2.11-0.13.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/arpack_c
 ombined_all-0.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/libthrift-0.9.3.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/aircompressor-0.8.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/parquet-jackson-1.8.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hk2-api-2.4.0-b34.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/asm-3.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/apacheds-kerberos-codec-2.0.0-M15.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-hive_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/ivy-2.4.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/javax.inject-2.4.0-b34.jar":"System Classpath","/export/apps/hadoop/site/etc/hadoop/":
 "System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/snappy-java-1.1.7.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/arrow-format-0.8.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/netty-all-4.1.17.Final.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/avro-ipc-1.7.7.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/xmlenc-0.52.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jdo-api-3.0.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/curator-client-2.7.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/antlr-runtime-3.4.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/pyrolite-4.13.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/scala-xml_2.11-1.0.5.jar":
 "System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-catalyst_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-collections-3.2.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/slf4j-api-1.7.16.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/stream-2.7.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/parquet-format-2.3.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/arrow-vector-0.8.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-yarn-server-web-proxy-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/htrace-core-3.1.0-incubating.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-sketch_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/
 home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jersey-common-2.22.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hppc-0.7.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jackson-core-asl-1.9.13.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-sql_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/univocity-parsers-2.5.9.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-math3-3.4.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-compiler-3.0.8.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-beanutils-1.7.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/java-xmlbuilder-1.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/javax.inject-
 1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-annotations-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/netty-3.9.9.Final.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/aopalliance-repackaged-2.4.0-b34.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/zookeeper-3.4.6.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/guice-3.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/scala-compiler-2.11.8.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/eigenbase-properties-1.1.5.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/aopalliance-1.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-yarn_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-S
 NAPSHOT-bin-2.7.4.51/jars/JavaEWAH-0.3.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jsr305-1.3.9.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/libfb303-0.9.3.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/javax.annotation-api-1.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-yarn-server-common-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-digester-1.8.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/metrics-jvm-3.1.5.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/curator-framework-2.7.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/javax.ws.rs-api-2.0.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/paranamer-2.8.jar":"System Classpath","/expor
 t/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/janino-3.0.8.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-mapreduce-client-core-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jersey-server-2.22.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/orc-core-1.4.3-nohive.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jsch-0.1.42.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/calcite-linq4j-1.2.0-incubating.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-unsafe_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-codec-1.10.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jtransforms-2.4.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/
 lz4-java-1.4.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/datanucleus-core-3.2.10.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/flatbuffers-1.2.0-3f79e055.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hive-exec-1.2.1.spark2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/avro-mapred-1.7.7-hadoop2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/stax-api-1.0.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/core-1.1.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/leveldbjni-all-1.8.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/datanucleus-api-jdo-3.2.6.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jackson-databind-2.6.7.1.jar":"System Classpath","/export/home/edlu/spark
 -2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-dbcp-1.4.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jackson-module-scala_2.11-2.6.7.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-lang3-3.5.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spire-macros_2.11-0.13.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jackson-module-paranamer-2.7.9.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/slf4j-log4j12-1.7.16.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/chill-java-0.8.4.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jodd-core-3.5.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-pool-1.5.4.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/osgi-resource-locator-1.0.1
 .jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/minlog-1.3.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-mapreduce-client-common-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/gson-2.2.4.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/py4j-0.10.6.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-streaming_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jackson-core-2.6.7.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/calcite-avatica-1.2.0-incubating.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/machinist_2.11-0.6.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/avro-1.7.7.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHO
 T-bin-2.7.4.51/jars/commons-beanutils-core-1.8.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/apacheds-i18n-2.0.0-M15.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jersey-media-jaxb-2.22.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/snappy-0.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-mapreduce-client-app-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/parquet-hadoop-bundle-1.6.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jul-to-slf4j-1.7.16.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/metrics-graphite-3.1.5.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jcl-over-slf4j-1.7.16.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/metrics-core-3.1.5.j
 ar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-mllib-local_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/arrow-memory-0.8.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/breeze_2.11-0.13.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jersey-guava-2.22.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-client-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/xercesImpl-2.9.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-tags_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/javolution-5.5.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jetty-6.1.26.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT
 -bin-2.7.4.51/jars/joda-time-2.9.3.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/antlr-2.7.7.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-mapreduce-client-jobclient-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-lang-2.6.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/compress-lzf-1.0.3.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-crypto-1.0.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jersey-core-1.9.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/curator-recipes-2.7.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hk2-locator-2.4.0-b34.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/guava-14.0.1.jar":"System Classpath","/export/home/e
 dlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jackson-jaxrs-1.9.13.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-core_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jetty-sslengine-6.1.26.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-network-common_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-launcher_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/json4s-ast_2.11-3.5.3.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/antlr4-runtime-4.7.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jetty-util-6.1.26.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jaxb-api-2.2.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4
 .51/jars/commons-io-2.4.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/parquet-encoding-1.8.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/httpcore-4.4.8.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/macro-compat_2.11-1.1.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jackson-xc-1.9.13.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/xbean-asm5-shaded-4.4.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/breeze-macros_2.11-0.13.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/protobuf-java-2.5.0.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/json4s-scalap_2.11-3.5.3.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-mllib_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/expo
 rt/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-configuration-1.6.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-compress-1.4.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/json4s-core_2.11-3.5.3.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/orc-mapreduce-1.4.3-nohive.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/ST4-4.0.4.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/calcite-core-1.2.0-incubating.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-mapreduce-client-shuffle-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-common-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-repl_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-
 SNAPSHOT-bin-2.7.4.51/jars/jersey-container-servlet-2.22.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/opencsv-2.3.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-logging-1.1.3.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/shapeless_2.11-2.3.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-cli-1.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jersey-client-2.22.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-yarn-common-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hadoop-hdfs-2.7.4.51.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/log4j-1.2.17.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/parquet-column-1.8.2.jar":"System Classpath","
 /export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/hive-metastore-1.2.1.spark2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/RoaringBitmap-0.5.11.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/chill_2.11-0.8.4.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jersey-container-servlet-core-2.22.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/stringtemplate-3.2.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/parquet-common-1.8.2.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-network-shuffle_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/spark-kvstore_2.11-2.4.0-SNAPSHOT.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/stax-api-1.0-2.jar":"System Classpath","/export/home/edlu/spark
 -2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jta-1.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/javassist-3.18.1-GA.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/commons-httpclient-3.1.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jets3t-0.9.4.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/apache-log4j-extras-1.2.17.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/metrics-json-3.1.5.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/bcprov-jdk15on-1.58.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/oro-2.0.8.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/bonecp-0.8.0.RELEASE.jar":"System Classpath","/export/home/edlu/spark-2.4.0-SNAPSHOT-bin-2.7.4.51/jars/jsp-api-2.1.jar":"System Classpath","/export/home/edlu/spark-2.4
 .0-SNAPSHOT-bin-2.7.4.51/jars/scala-parser-combinators_2.11-1.0.4.jar":"System Classpath"}}
+{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"application_1506645932520_24630151","Timestamp":1524182082734,"User":"edlu"}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1524182111695,"Executor ID":"1","Executor Info":{"Host":"node1404.grid.company.com","Total Cores":1,"Log Urls":{"stdout":"http://node1404.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000002/edlu/stdout?start=-4096","stderr":"http://node1404.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000002/edlu/stderr?start=-4096"}}}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"1","Host":"node1404.grid.company.com","Port":34043},"Maximum Memory":956615884,"Timestamp":1524182111795,"Maximum Onheap Memory":956615884,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1524182112088,"Executor ID":"3","Executor Info":{"Host":"node0998.grid.company.com","Total Cores":1,"Log Urls":{"stdout":"http://node0998.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000005/edlu/stdout?start=-4096","stderr":"http://node0998.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000005/edlu/stderr?start=-4096"}}}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"3","Host":"node0998.grid.company.com","Port":45265},"Maximum Memory":956615884,"Timestamp":1524182112208,"Maximum Onheap Memory":956615884,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1524182112278,"Executor ID":"4","Executor Info":{"Host":"node4243.grid.company.com","Total Cores":1,"Log Urls":{"stdout":"http://node4243.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000006/edlu/stdout?start=-4096","stderr":"http://node4243.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000006/edlu/stderr?start=-4096"}}}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"4","Host":"node4243.grid.company.com","Port":16084},"Maximum Memory":956615884,"Timestamp":1524182112408,"Maximum Onheap Memory":956615884,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1524182112471,"Executor ID":"2","Executor Info":{"Host":"node4045.grid.company.com","Total Cores":1,"Log Urls":{"stdout":"http://node4045.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000004/edlu/stdout?start=-4096","stderr":"http://node4045.grid.company.com:8042/node/containerlogs/container_e05_1523494505172_1552404_01_000004/edlu/stderr?start=-4096"}}}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"2","Host":"node4045.grid.company.com","Port":29262},"Maximum Memory":956615884,"Timestamp":1524182112578,"Maximum Onheap Memory":956615884,"Maximum Offheap Memory":0}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":0,"description":"createOrReplaceTempView at <console>:40","details":"org.apache.spark.sql.Dataset.createOrReplaceTempView(Dataset.scala:3033)\n$line44.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:40)\n$line44.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:45)\n$line44.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:47)\n$line44.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:49)\n$line44.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:51)\n$line44.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:53)\n$line44.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:55)\n$line44.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:57)\n$line44.$read$$iw$$iw$$iw$$iw.<init>(<console>:59)\n$line44.$read$$iw$$iw$$iw.<init>(<console>:61)\n$line44.$read$$iw$$iw.<init>(<console>:63)\n$line44.$read$$iw.<init>(<console>:65)\n$line44.$read.<init>
 (<console>:67)\n$line44.$read$.<init>(<console>:71)\n$line44.$read$.<clinit>(<console>)\n$line44.$eval$.$print$lzycompute(<console>:7)\n$line44.$eval$.$print(<console>:6)\n$line44.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)","physicalPlanDescription":"== Parsed Logical Plan ==\nCreateViewCommand `apps`, false, true, LocalTempView\n   +- AnalysisBarrier\n         +- Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, cast(endTime#6 as date) AS endDate#28]\n            +- Relation[appId#0,attemptId#1,name#2,mode#3,completed#4,duration#5L,endTime#6,endTimeEpoch#7L,lastUpdated#8,lastUpdatedEpoch#9L,sparkUser#10,startTime#11,startTimeEpoch#12L,appSparkVersion#13] avro\n\n== Analyzed Logical Plan ==\nCreateViewCommand `apps`, false, true, LocalTempView\n   +- AnalysisBarrier\n         +- Project [a
 ppId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, cast(endTime#6 as date) AS endDate#28]\n            +- Relation[appId#0,attemptId#1,name#2,mode#3,completed#4,duration#5L,endTime#6,endTimeEpoch#7L,lastUpdated#8,lastUpdatedEpoch#9L,sparkUser#10,startTime#11,startTimeEpoch#12L,appSparkVersion#13] avro\n\n== Optimized Logical Plan ==\nCreateViewCommand `apps`, false, true, LocalTempView\n   +- AnalysisBarrier\n         +- Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, cast(endTime#6 as date) AS endDate#28]\n            +- Relation[appId#0,attemptId#1,name#2,mode#3,completed#4,duration#5L,endTime#6,endTimeEpoch#7L,lastUpdated#8,lastUpdatedEpoch#9L,sparkUser#10,startTime#11,startTimeEpoch#
 12L,appSparkVersion#13] avro\n\n== Physical Plan ==\nExecute CreateViewCommand\n   +- CreateViewCommand `apps`, false, true, LocalTempView\n         +- AnalysisBarrier\n               +- Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, cast(endTime#6 as date) AS endDate#28]\n                  +- Relation[appId#0,attemptId#1,name#2,mode#3,completed#4,duration#5L,endTime#6,endTimeEpoch#7L,lastUpdated#8,lastUpdatedEpoch#9L,sparkUser#10,startTime#11,startTimeEpoch#12L,appSparkVersion#13] avro","sparkPlanInfo":{"nodeName":"Execute CreateViewCommand","simpleString":"Execute CreateViewCommand","children":[],"metrics":[]},"time":1524182125829}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":0,"time":1524182125832}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":1,"description":"createOrReplaceTempView at <console>:40","details":"org.apache.spark.sql.Dataset.createOrReplaceTempView(Dataset.scala:3033)\n$line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:40)\n$line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:45)\n$line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:47)\n$line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:49)\n$line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:51)\n$line48.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:53)\n$line48.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:55)\n$line48.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:57)\n$line48.$read$$iw$$iw$$iw$$iw.<init>(<console>:59)\n$line48.$read$$iw$$iw$$iw.<init>(<console>:61)\n$line48.$read$$iw$$iw.<init>(<console>:63)\n$line48.$read$$iw.<init>(<console>:65)\n$line48.$read.<init>
 (<console>:67)\n$line48.$read$.<init>(<console>:71)\n$line48.$read$.<clinit>(<console>)\n$line48.$eval$.$print$lzycompute(<console>:7)\n$line48.$eval$.$print(<console>:6)\n$line48.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)","physicalPlanDescription":"== Parsed Logical Plan ==\nCreateViewCommand `sys_props`, false, true, LocalTempView\n   +- AnalysisBarrier\n         +- Aggregate [appId#137], [appId#137, first(if ((key#148 <=> azkaban.link.workflow.url)) value#149 else cast(null as string), true) AS azkaban.link.workflow.url#159, first(if ((key#148 <=> azkaban.link.execution.url)) value#149 else cast(null as string), true) AS azkaban.link.execution.url#161, first(if ((key#148 <=> azkaban.link.job.url)) value#149 else cast(null as string), true) AS azkaban.link.job.url#163, first(if ((key#148 <=> user.name)) value#149 else cast(null as string), true) AS user.name#165]\n            +- Project [appId#137, col#145.key AS key#148, col#145.value AS
  value#149]\n               +- Project [appId#137, col#145]\n                  +- Generate explode(systemProperties#135), false, [col#145]\n                     +- Relation[runtime#133,sparkProperties#134,systemProperties#135,classpathEntries#136,appId#137,attemptId#138] avro\n\n== Analyzed Logical Plan ==\nCreateViewCommand `sys_props`, false, true, LocalTempView\n   +- AnalysisBarrier\n         +- Aggregate [appId#137], [appId#137, first(if ((key#148 <=> azkaban.link.workflow.url)) value#149 else cast(null as string), true) AS azkaban.link.workflow.url#159, first(if ((key#148 <=> azkaban.link.execution.url)) value#149 else cast(null as string), true) AS azkaban.link.execution.url#161, first(if ((key#148 <=> azkaban.link.job.url)) value#149 else cast(null as string), true) AS azkaban.link.job.url#163, first(if ((key#148 <=> user.name)) value#149 else cast(null as string), true) AS user.name#165]\n            +- Project [appId#137, col#145.key AS key#148, col#145.value AS value#149]
 \n               +- Project [appId#137, col#145]\n                  +- Generate explode(systemProperties#135), false, [col#145]\n                     +- Relation[runtime#133,sparkProperties#134,systemProperties#135,classpathEntries#136,appId#137,attemptId#138] avro\n\n== Optimized Logical Plan ==\nCreateViewCommand `sys_props`, false, true, LocalTempView\n   +- AnalysisBarrier\n         +- Aggregate [appId#137], [appId#137, first(if ((key#148 <=> azkaban.link.workflow.url)) value#149 else cast(null as string), true) AS azkaban.link.workflow.url#159, first(if ((key#148 <=> azkaban.link.execution.url)) value#149 else cast(null as string), true) AS azkaban.link.execution.url#161, first(if ((key#148 <=> azkaban.link.job.url)) value#149 else cast(null as string), true) AS azkaban.link.job.url#163, first(if ((key#148 <=> user.name)) value#149 else cast(null as string), true) AS user.name#165]\n            +- Project [appId#137, col#145.key AS key#148, col#145.value AS value#149]\n        
        +- Project [appId#137, col#145]\n                  +- Generate explode(systemProperties#135), false, [col#145]\n                     +- Relation[runtime#133,sparkProperties#134,systemProperties#135,classpathEntries#136,appId#137,attemptId#138] avro\n\n== Physical Plan ==\nExecute CreateViewCommand\n   +- CreateViewCommand `sys_props`, false, true, LocalTempView\n         +- AnalysisBarrier\n               +- Aggregate [appId#137], [appId#137, first(if ((key#148 <=> azkaban.link.workflow.url)) value#149 else cast(null as string), true) AS azkaban.link.workflow.url#159, first(if ((key#148 <=> azkaban.link.execution.url)) value#149 else cast(null as string), true) AS azkaban.link.execution.url#161, first(if ((key#148 <=> azkaban.link.job.url)) value#149 else cast(null as string), true) AS azkaban.link.job.url#163, first(if ((key#148 <=> user.name)) value#149 else cast(null as string), true) AS user.name#165]\n                  +- Project [appId#137, col#145.key AS key#148, col#1
 45.value AS value#149]\n                     +- Project [appId#137, col#145]\n                        +- Generate explode(systemProperties#135), false, [col#145]\n                           +- Relation[runtime#133,sparkProperties#134,systemProperties#135,classpathEntries#136,appId#137,attemptId#138] avro","sparkPlanInfo":{"nodeName":"Execute CreateViewCommand","simpleString":"Execute CreateViewCommand","children":[],"metrics":[]},"time":1524182128463}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd","executionId":1,"time":1524182128463}
+{"Event":"org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart","executionId":2,"description":"show at <console>:40","details":"org.apache.spark.sql.Dataset.show(Dataset.scala:691)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:40)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:45)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:47)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:49)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:51)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:53)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:55)\n$line50.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:57)\n$line50.$read$$iw$$iw$$iw$$iw.<init>(<console>:59)\n$line50.$read$$iw$$iw$$iw.<init>(<console>:61)\n$line50.$read$$iw$$iw.<init>(<console>:63)\n$line50.$read$$iw.<init>(<console>:65)\n$line50.$read.<init>(<console>:67)\n$line50.$read$.<init>(<
 console>:71)\n$line50.$read$.<clinit>(<console>)\n$line50.$eval$.$print$lzycompute(<console>:7)\n$line50.$eval$.$print(<console>:6)\n$line50.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)","physicalPlanDescription":"== Parsed Logical Plan ==\nGlobalLimit 21\n+- LocalLimit 21\n   +- AnalysisBarrier\n         +- Project [cast(appId#0 as string) AS appId#397, cast(attemptId#1 as string) AS attemptId#398, cast(name#2 as string) AS name#399, cast(mode#3 as string) AS mode#400, cast(completed#4 as string) AS completed#401, cast(duration#5L as string) AS duration#402, cast(endTime#6 as string) AS endTime#403, cast(endTimeEpoch#7L as string) AS endTimeEpoch#404, cast(lastUpdated#8 as string) AS lastUpdated#405, cast(lastUpdatedEpoch#9L as string) AS lastUpdatedEpoch#406, cast(sparkUser#10 as string) AS sparkUser#407, cast(startTime#11 as string) AS startTime#408, cast(startTimeEpoch#12L as string) AS startTimeEpoch#409, cast(appSparkVersion#13 as string
 ) AS appSparkVersion#410, cast(endDate#28 as string) AS endDate#411, cast(azkaban.link.workflow.url#159 as string) AS azkaban.link.workflow.url#412, cast(azkaban.link.execution.url#161 as string) AS azkaban.link.execution.url#413, cast(azkaban.link.job.url#163 as string) AS azkaban.link.job.url#414, cast(user.name#165 as string) AS user.name#415]\n            +- Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165]\n               +- Join LeftOuter, (appId#0 = appId#137)\n                  :- Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, cast(endTime#6 as date) AS 
 endDate#28]\n                  :  +- Relation[appId#0,attemptId#1,name#2,mode#3,completed#4,duration#5L,endTime#6,endTimeEpoch#7L,lastUpdated#8,lastUpdatedEpoch#9L,sparkUser#10,startTime#11,startTimeEpoch#12L,appSparkVersion#13] avro\n                  +- Aggregate [appId#137], [appId#137, first(if ((key#148 <=> azkaban.link.workflow.url)) value#149 else cast(null as string), true) AS azkaban.link.workflow.url#159, first(if ((key#148 <=> azkaban.link.execution.url)) value#149 else cast(null as string), true) AS azkaban.link.execution.url#161, first(if ((key#148 <=> azkaban.link.job.url)) value#149 else cast(null as string), true) AS azkaban.link.job.url#163, first(if ((key#148 <=> user.name)) value#149 else cast(null as string), true) AS user.name#165]\n                     +- Project [appId#137, col#145.key AS key#148, col#145.value AS value#149]\n                        +- Project [appId#137, col#145]\n                           +- Generate explode(systemProperties#135), false, [c
 ol#145]\n                              +- Relation[runtime#133,sparkProperties#134,systemProperties#135,classpathEntries#136,appId#137,attemptId#138] avro\n\n== Analyzed Logical Plan ==\nappId: string, attemptId: string, name: string, mode: string, completed: string, duration: string, endTime: string, endTimeEpoch: string, lastUpdated: string, lastUpdatedEpoch: string, sparkUser: string, startTime: string, startTimeEpoch: string, appSparkVersion: string, endDate: string, azkaban.link.workflow.url: string, azkaban.link.execution.url: string, azkaban.link.job.url: string, user.name: string\nGlobalLimit 21\n+- LocalLimit 21\n   +- Project [cast(appId#0 as string) AS appId#397, cast(attemptId#1 as string) AS attemptId#398, cast(name#2 as string) AS name#399, cast(mode#3 as string) AS mode#400, cast(completed#4 as string) AS completed#401, cast(duration#5L as string) AS duration#402, cast(endTime#6 as string) AS endTime#403, cast(endTimeEpoch#7L as string) AS endTimeEpoch#404, cast(lastU
 pdated#8 as string) AS lastUpdated#405, cast(lastUpdatedEpoch#9L as string) AS lastUpdatedEpoch#406, cast(sparkUser#10 as string) AS sparkUser#407, cast(startTime#11 as string) AS startTime#408, cast(startTimeEpoch#12L as string) AS startTimeEpoch#409, cast(appSparkVersion#13 as string) AS appSparkVersion#410, cast(endDate#28 as string) AS endDate#411, cast(azkaban.link.workflow.url#159 as string) AS azkaban.link.workflow.url#412, cast(azkaban.link.execution.url#161 as string) AS azkaban.link.execution.url#413, cast(azkaban.link.job.url#163 as string) AS azkaban.link.job.url#414, cast(user.name#165 as string) AS user.name#415]\n      +- Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165]\n         +- Join LeftOuter, (a
 ppId#0 = appId#137)\n            :- Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, cast(endTime#6 as date) AS endDate#28]\n            :  +- Relation[appId#0,attemptId#1,name#2,mode#3,completed#4,duration#5L,endTime#6,endTimeEpoch#7L,lastUpdated#8,lastUpdatedEpoch#9L,sparkUser#10,startTime#11,startTimeEpoch#12L,appSparkVersion#13] avro\n            +- Aggregate [appId#137], [appId#137, first(if ((key#148 <=> azkaban.link.workflow.url)) value#149 else cast(null as string), true) AS azkaban.link.workflow.url#159, first(if ((key#148 <=> azkaban.link.execution.url)) value#149 else cast(null as string), true) AS azkaban.link.execution.url#161, first(if ((key#148 <=> azkaban.link.job.url)) value#149 else cast(null as string), true) AS azkaban.link.job.url#163, first(if ((key#148 <=> user.name)) value#149 else cast(null as string), tr
 ue) AS user.name#165]\n               +- Project [appId#137, col#145.key AS key#148, col#145.value AS value#149]\n                  +- Project [appId#137, col#145]\n                     +- Generate explode(systemProperties#135), false, [col#145]\n                        +- Relation[runtime#133,sparkProperties#134,systemProperties#135,classpathEntries#136,appId#137,attemptId#138] avro\n\n== Optimized Logical Plan ==\nGlobalLimit 21\n+- LocalLimit 21\n   +- Project [appId#0, attemptId#1, name#2, mode#3, cast(completed#4 as string) AS completed#401, cast(duration#5L as string) AS duration#402, endTime#6, cast(endTimeEpoch#7L as string) AS endTimeEpoch#404, lastUpdated#8, cast(lastUpdatedEpoch#9L as string) AS lastUpdatedEpoch#406, sparkUser#10, startTime#11, cast(startTimeEpoch#12L as string) AS startTimeEpoch#409, appSparkVersion#13, cast(endDate#28 as string) AS endDate#411, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165]\n     
  +- InMemoryRelation [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)\n            +- *(5) Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165]\n               +- SortMergeJoin [appId#0], [appId#137], LeftOuter\n                  :- *(1) Sort [appId#0 ASC NULLS FIRST], false, 0\n                  :  +- Exchange hashpartitioning(appId#0, 200)\n                  :     +- InMemoryTableScan [appId#0
 , attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28]\n                  :           +- InMemoryRelation [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)\n                  :                 +- *(1) Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, cast(endTime#6 as date) AS endDate#28]\n                  :                    +- *(1) FileScan avro [appId#0,attemptId#1,name#2,mode#3,completed#4,duration#5L,endTime#6,endTimeEpoch#7L,lastUpdated#8,lastUpdatedEpoch#9L
 ,sparkUser#10,startTime#11,startTimeEpoch#12L,appSparkVersion#13] Batched: false, Format: com.databricks.spark.avro.DefaultSource@7006b304, Location: InMemoryFileIndex[hdfs://clusternn01.grid.company.com:9000/data/hadoopdev/sparkmetrics/ltx1-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<appId:string,attemptId:string,name:string,mode:string,completed:boolean,duration:bigint,en...\n                  +- SortAggregate(key=[appId#137], functions=[first(if ((key#148 <=> azkaban.link.workflow.url)) value#149 else null, true), first(if ((key#148 <=> azkaban.link.execution.url)) value#149 else null, true), first(if ((key#148 <=> azkaban.link.job.url)) value#149 else null, true), first(if ((key#148 <=> user.name)) value#149 else null, true)], output=[appId#137, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165])\n                     +- *(4) Sort [appId#137 ASC NULLS FIRST], false, 0\n                        +- Exchange h
 ashpartitioning(appId#137, 200)\n                           +- SortAggregate(key=[appId#137], functions=[partial_first(if ((key#148 <=> azkaban.link.workflow.url)) value#149 else null, true), partial_first(if ((key#148 <=> azkaban.link.execution.url)) value#149 else null, true), partial_first(if ((key#148 <=> azkaban.link.job.url)) value#149 else null, true), partial_first(if ((key#148 <=> user.name)) value#149 else null, true)], output=[appId#137, first#273, valueSet#274, first#275, valueSet#276, first#277, valueSet#278, first#279, valueSet#280])\n                              +- *(3) Sort [appId#137 ASC NULLS FIRST], false, 0\n                                 +- *(3) Project [appId#137, col#145.key AS key#148, col#145.value AS value#149]\n                                    +- Generate explode(systemProperties#135), [appId#137], false, [col#145]\n                                       +- *(2) FileScan avro [systemProperties#135,appId#137] Batched: false, Format: com.databricks.spa
 rk.avro.DefaultSource@485d3d1, Location: InMemoryFileIndex[hdfs://clusternn01.grid.company.com:9000/data/hadoopdev/sparkmetrics/ltx1-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<systemProperties:array<struct<key:string,value:string>>,appId:string>\n\n== Physical Plan ==\nCollectLimit 21\n+- *(1) LocalLimit 21\n   +- *(1) Project [appId#0, attemptId#1, name#2, mode#3, cast(completed#4 as string) AS completed#401, cast(duration#5L as string) AS duration#402, endTime#6, cast(endTimeEpoch#7L as string) AS endTimeEpoch#404, lastUpdated#8, cast(lastUpdatedEpoch#9L as string) AS lastUpdatedEpoch#406, sparkUser#10, startTime#11, cast(startTimeEpoch#12L as string) AS startTimeEpoch#409, appSparkVersion#13, cast(endDate#28 as string) AS endDate#411, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165]\n      +- InMemoryTableScan [appId#0, appSparkVersion#13, attemptId#1, azkaban.link.execution.url#161, azkaban.link.job.url
 #163, azkaban.link.workflow.url#159, completed#4, duration#5L, endDate#28, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, mode#3, name#2, sparkUser#10, startTime#11, startTimeEpoch#12L, user.name#165]\n            +- InMemoryRelation [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)\n                  +- *(5) Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165]\n                 
     +- SortMergeJoin [appId#0], [appId#137], LeftOuter\n                        :- *(1) Sort [appId#0 ASC NULLS FIRST], false, 0\n                        :  +- Exchange hashpartitioning(appId#0, 200)\n                        :     +- InMemoryTableScan [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28]\n                        :           +- InMemoryRelation [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)\n                        :                 +- *(1) Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, s
 tartTime#11, startTimeEpoch#12L, appSparkVersion#13, cast(endTime#6 as date) AS endDate#28]\n                        :                    +- *(1) FileScan avro [appId#0,attemptId#1,name#2,mode#3,completed#4,duration#5L,endTime#6,endTimeEpoch#7L,lastUpdated#8,lastUpdatedEpoch#9L,sparkUser#10,startTime#11,startTimeEpoch#12L,appSparkVersion#13] Batched: false, Format: com.databricks.spark.avro.DefaultSource@7006b304, Location: InMemoryFileIndex[hdfs://clusternn01.grid.company.com:9000/data/hadoopdev/sparkmetrics/ltx1-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<appId:string,attemptId:string,name:string,mode:string,completed:boolean,duration:bigint,en...\n                        +- SortAggregate(key=[appId#137], functions=[first(if ((key#148 <=> azkaban.link.workflow.url)) value#149 else null, true), first(if ((key#148 <=> azkaban.link.execution.url)) value#149 else null, true), first(if ((key#148 <=> azkaban.link.job.url)) value#149 else null, true), first(if ((key#
 148 <=> user.name)) value#149 else null, true)], output=[appId#137, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165])\n                           +- *(4) Sort [appId#137 ASC NULLS FIRST], false, 0\n                              +- Exchange hashpartitioning(appId#137, 200)\n                                 +- SortAggregate(key=[appId#137], functions=[partial_first(if ((key#148 <=> azkaban.link.workflow.url)) value#149 else null, true), partial_first(if ((key#148 <=> azkaban.link.execution.url)) value#149 else null, true), partial_first(if ((key#148 <=> azkaban.link.job.url)) value#149 else null, true), partial_first(if ((key#148 <=> user.name)) value#149 else null, true)], output=[appId#137, first#273, valueSet#274, first#275, valueSet#276, first#277, valueSet#278, first#279, valueSet#280])\n                                    +- *(3) Sort [appId#137 ASC NULLS FIRST], false, 0\n                                       +- *(3) Proje
 ct [appId#137, col#145.key AS key#148, col#145.value AS value#149]\n                                          +- Generate explode(systemProperties#135), [appId#137], false, [col#145]\n                                             +- *(2) FileScan avro [systemProperties#135,appId#137] Batched: false, Format: com.databricks.spark.avro.DefaultSource@485d3d1, Location: InMemoryFileIndex[hdfs://clusternn01.grid.company.com:9000/data/hadoopdev/sparkmetrics/ltx1-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<systemProperties:array<struct<key:string,value:string>>,appId:string>","sparkPlanInfo":{"nodeName":"CollectLimit","simpleString":"CollectLimit 21","children":[{"nodeName":"WholeStageCodegen","simpleString":"WholeStageCodegen","children":[{"nodeName":"LocalLimit","simpleString":"LocalLimit 21","children":[{"nodeName":"Project","simpleString":"Project [appId#0, attemptId#1, name#2, mode#3, cast(completed#4 as string) AS completed#401, cast(duration#5L as string) AS durat
 ion#402, endTime#6, cast(endTimeEpoch#7L as string) AS endTimeEpoch#404, lastUpdated#8, cast(lastUpdatedEpoch#9L as string) AS lastUpdatedEpoch#406, sparkUser#10, startTime#11, cast(startTimeEpoch#12L as string) AS startTimeEpoch#409, appSparkVersion#13, cast(endDate#28 as string) AS endDate#411, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165]","children":[{"nodeName":"InputAdapter","simpleString":"InputAdapter","children":[{"nodeName":"InMemoryTableScan","simpleString":"InMemoryTableScan [appId#0, appSparkVersion#13, attemptId#1, azkaban.link.execution.url#161, azkaban.link.job.url#163, azkaban.link.workflow.url#159, completed#4, duration#5L, endDate#28, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, mode#3, name#2, sparkUser#10, startTime#11, startTimeEpoch#12L, user.name#165]","children":[],"metrics":[{"name":"number of output rows","accumulatorId":35,"metricType":"sum"},{"name":"scan time total (min, med, m
 ax)","accumulatorId":36,"metricType":"timing"}]}],"metrics":[]}],"metrics":[]}],"metrics":[]}],"metrics":[{"name":"duration total (min, med, max)","accumulatorId":34,"metricType":"timing"}]}],"metrics":[]},"time":1524182129952}
+{"Event":"SparkListenerJobStart","Job ID":0,"Submission Time":1524182130194,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"cache at <console>:41","Number of Tasks":4,"RDD Info":[{"RDD ID":6,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"11\",\"name\":\"Exchange\"}","Callsite":"cache at <console>:41","Parent IDs":[5],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":0,"Name":"FileScanRDD","Scope":"{\"id\":\"0\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at <console>:39","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":2,"Name":"*(1) Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, spar
 kUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, cast(endTime#6 as date) AS endDate#28]\n+- *(1) FileScan avro [appId#0,attemptId#1,name#2,mode#3,completed#4,duration#5L,endTime#6,endTimeEpoch#7L,lastUpdated#8,lastUpdatedEpoch#9L,sparkUser#10,startTime#11,startTimeEpoch#12L,appSparkVersion#13] Batched: false, Format: com.databricks.spark.avro.DefaultSource@7006b304, Location: InMemoryFileIndex[hdfs://clusternn01.grid.company.com:9000/data/hadoopdev/sparkmetrics/ltx1-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<appId:string,attemptId:string,name:string,mode:string,completed:boolean,duration:bigint,en...\n","Scope":"{\"id\":\"3\",\"name\":\"mapPartitionsInternal\"}","Callsite":"cache at <console>:39","Parent IDs":[1],"Storage Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"0\",
 \"name\":\"WholeStageCodegen\"}","Callsite":"cache at <console>:39","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":5,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"12\",\"name\":\"InMemoryTableScan\"}","Callsite":"cache at <console>:41","Parent IDs":[4],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":4,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"12\",\"name\":\"InMemoryTableScan\"}","Callsite":"cache at <console>:41","Parent IDs":[2],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.sql.Dataset.cache(Dataset.scala:2912
 )\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:41)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:46)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:48)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:50)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:52)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:54)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:56)\n$line49.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:58)\n$line49.$read$$iw$$iw$$iw$$iw.<init>(<console>:60)\n$line49.$read$$iw$$iw$$iw.<init>(<console>:62)\n$line49.$read$$iw$$iw.<init>(<console>:64)\n$line49.$read$$iw.<init>(<console>:66)\n$line49.$read.<init>(<console>:68)\n$line49.$read$.<init>(<console>:72)\n$line49.$read$.<clinit>(<console>)\n$line49.$eval$.$print$lzycompute(<console>:7)\n$line49.$eval$.$print(<console>:6)\n$line49.$eval.$print(<console>)\nsun.reflect.NativeMethodA
 ccessorImpl.invoke0(Native Method)","Accumulables":[]},{"Stage ID":1,"Stage Attempt ID":0,"Stage Name":"cache at <console>:41","Number of Tasks":4,"RDD Info":[{"RDD ID":14,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"17\",\"name\":\"Exchange\"}","Callsite":"cache at <console>:41","Parent IDs":[13],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":10,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"24\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at <console>:41","Parent IDs":[9],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":9,"Name":"FileScanRDD","Scope":"{\"id\":\"24\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at <console>:41","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":fals
 e,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":12,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"19\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at <console>:41","Parent IDs":[11],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":11,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"23\",\"name\":\"Generate\"}","Callsite":"cache at <console>:41","Parent IDs":[10],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":13,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"18\",\"name\":\"SortAggregate\"}","Callsite":"cache at <console>:41","Parent IDs":[12],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Re
 plication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.sql.Dataset.cache(Dataset.scala:2912)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:41)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:46)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:48)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:50)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:52)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:54)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:56)\n$line49.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:58)\n$line49.$read$$iw$$iw$$iw$$iw.<init>(<console>:60)\n$line49.$read$$iw$$iw$$iw.<init>(<console>:62)\n$line49.$read$$iw$$iw.<init>(<console>:64)\n$line49.$read$$iw.<init>(<console>:66)\n$line49.$read.<init>(<console>:68)\n$line49.$read$.<init>(<console>
 :72)\n$line49.$read$.<clinit>(<console>)\n$line49.$eval$.$print$lzycompute(<console>:7)\n$line49.$eval$.$print(<console>:6)\n$line49.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)","Accumulables":[]},{"Stage ID":2,"Stage Attempt ID":0,"Stage Name":"show at <console>:40","Number of Tasks":1,"RDD Info":[{"RDD ID":26,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"33\",\"name\":\"map\"}","Callsite":"show at <console>:40","Parent IDs":[25],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":25,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"32\",\"name\":\"mapPartitionsInternal\"}","Callsite":"show at <console>:40","Parent IDs":[24],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID
 ":8,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"8\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at <console>:41","Parent IDs":[7],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":24,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"27\",\"name\":\"WholeStageCodegen\"}","Callsite":"show at <console>:40","Parent IDs":[23],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":22,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"31\",\"name\":\"InMemoryTableScan\"}","Callsite":"show at <console>:40","Parent IDs":[20],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":20,"Name":
 "*(5) Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28, azkaban.link.workflow.url#159, azkaban.link.execution.url#161, azkaban.link.job.url#163, user.name#165]\n+- SortMergeJoin [appId#0], [appId#137], LeftOuter\n   :- *(1) Sort [appId#0 ASC NULLS FIRST], false, 0\n   :  +- Exchange hashpartitioning(appId#0, 200)\n   :     +- InMemoryTableScan [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28]\n   :           +- InMemoryRelation [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#12L, appSparkVersion#13, endDate#28], true, 10000, StorageLevel(dis
 k, memory, deserialized, 1 rep...","Scope":"{\"id\":\"26\",\"name\":\"mapPartitionsInternal\"}","Callsite":"cache at <console>:41","Parent IDs":[19],"Storage Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":23,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"31\",\"name\":\"InMemoryTableScan\"}","Callsite":"show at <console>:40","Parent IDs":[22],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":18,"Name":"ZippedPartitionsRDD2","Scope":"{\"id\":\"7\",\"name\":\"SortMergeJoin\"}","Callsite":"cache at <console>:41","Parent IDs":[8,17],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":
 17,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"13\",\"name\":\"SortAggregate\"}","Callsite":"cache at <console>:41","Parent IDs":[16],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":7,"Name":"ShuffledRowRDD","Scope":"{\"id\":\"11\",\"name\":\"Exchange\"}","Callsite":"cache at <console>:41","Parent IDs":[6],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":16,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"14\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at <console>:41","Parent IDs":[15],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":15,"Name":"ShuffledRowRD
 D","Scope":"{\"id\":\"17\",\"name\":\"Exchange\"}","Callsite":"cache at <console>:41","Parent IDs":[14],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":19,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"4\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at <console>:41","Parent IDs":[18],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":200,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[0,1],"Details":"org.apache.spark.sql.Dataset.show(Dataset.scala:691)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:40)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:45)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:47)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<
 console>:49)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:51)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:53)\n$line50.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:55)\n$line50.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:57)\n$line50.$read$$iw$$iw$$iw$$iw.<init>(<console>:59)\n$line50.$read$$iw$$iw$$iw.<init>(<console>:61)\n$line50.$read$$iw$$iw.<init>(<console>:63)\n$line50.$read$$iw.<init>(<console>:65)\n$line50.$read.<init>(<console>:67)\n$line50.$read$.<init>(<console>:71)\n$line50.$read$.<clinit>(<console>)\n$line50.$eval$.$print$lzycompute(<console>:7)\n$line50.$eval$.$print(<console>:6)\n$line50.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)","Accumulables":[]}],"Stage IDs":[0,1,2],"Properties":{"spark.sql.execution.id":"2"}}
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"cache at <console>:41","Number of Tasks":4,"RDD Info":[{"RDD ID":6,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"11\",\"name\":\"Exchange\"}","Callsite":"cache at <console>:41","Parent IDs":[5],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":0,"Name":"FileScanRDD","Scope":"{\"id\":\"0\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at <console>:39","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":2,"Name":"*(1) Project [appId#0, attemptId#1, name#2, mode#3, completed#4, duration#5L, endTime#6, endTimeEpoch#7L, lastUpdated#8, lastUpdatedEpoch#9L, sparkUser#10, startTime#11, startTimeEpoch#
 12L, appSparkVersion#13, cast(endTime#6 as date) AS endDate#28]\n+- *(1) FileScan avro [appId#0,attemptId#1,name#2,mode#3,completed#4,duration#5L,endTime#6,endTimeEpoch#7L,lastUpdated#8,lastUpdatedEpoch#9L,sparkUser#10,startTime#11,startTimeEpoch#12L,appSparkVersion#13] Batched: false, Format: com.databricks.spark.avro.DefaultSource@7006b304, Location: InMemoryFileIndex[hdfs://clusternn01.grid.company.com:9000/data/hadoopdev/sparkmetrics/ltx1-..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<appId:string,attemptId:string,name:string,mode:string,completed:boolean,duration:bigint,en...\n","Scope":"{\"id\":\"3\",\"name\":\"mapPartitionsInternal\"}","Callsite":"cache at <console>:39","Parent IDs":[1],"Storage Level":{"Use Disk":true,"Use Memory":true,"Deserialized":true,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"0\",\"name\":\"WholeStageCodegen\"}","Calls
 ite":"cache at <console>:39","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":5,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"12\",\"name\":\"InMemoryTableScan\"}","Callsite":"cache at <console>:41","Parent IDs":[4],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":4,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"12\",\"name\":\"InMemoryTableScan\"}","Callsite":"cache at <console>:41","Parent IDs":[2],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.sql.Dataset.cache(Dataset.scala:2912)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$i
 w$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:41)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:46)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:48)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:50)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:52)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:54)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:56)\n$line49.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:58)\n$line49.$read$$iw$$iw$$iw$$iw.<init>(<console>:60)\n$line49.$read$$iw$$iw$$iw.<init>(<console>:62)\n$line49.$read$$iw$$iw.<init>(<console>:64)\n$line49.$read$$iw.<init>(<console>:66)\n$line49.$read.<init>(<console>:68)\n$line49.$read$.<init>(<console>:72)\n$line49.$read$.<clinit>(<console>)\n$line49.$eval$.$print$lzycompute(<console>:7)\n$line49.$eval$.$print(<console>:6)\n$line49.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)","Su
 bmission Time":1524182130229,"Accumulables":[]},"Properties":{"spark.sql.execution.id":"2"}}
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":1,"Stage Attempt ID":0,"Stage Name":"cache at <console>:41","Number of Tasks":4,"RDD Info":[{"RDD ID":14,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"17\",\"name\":\"Exchange\"}","Callsite":"cache at <console>:41","Parent IDs":[13],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":10,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"24\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at <console>:41","Parent IDs":[9],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":9,"Name":"FileScanRDD","Scope":"{\"id\":\"24\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at <console>:41","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"
 Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":12,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"19\",\"name\":\"WholeStageCodegen\"}","Callsite":"cache at <console>:41","Parent IDs":[11],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":11,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"23\",\"name\":\"Generate\"}","Callsite":"cache at <console>:41","Parent IDs":[10],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID":13,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"18\",\"name\":\"SortAggregate\"}","Callsite":"cache at <console>:41","Parent IDs":[12],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Repli
 cation":1},"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"org.apache.spark.sql.Dataset.cache(Dataset.scala:2912)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:41)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:46)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:48)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:50)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:52)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:54)\n$line49.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:56)\n$line49.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:58)\n$line49.$read$$iw$$iw$$iw$$iw.<init>(<console>:60)\n$line49.$read$$iw$$iw$$iw.<init>(<console>:62)\n$line49.$read$$iw$$iw.<init>(<console>:64)\n$line49.$read$$iw.<init>(<console>:66)\n$line49.$read.<init>(<console>:68)\n$line49.$read$.<init>(<console>:72
 )\n$line49.$read$.<clinit>(<console>)\n$line49.$eval$.$print$lzycompute(<console>:7)\n$line49.$eval$.$print(<console>:6)\n$line49.$eval.$print(<console>)\nsun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)","Submission Time":1524182130328,"Accumulables":[]},"Properties":{"spark.sql.execution.id":"2"}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1524182130331,"Executor ID":"2","Host":"node4045.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1524182130349,"Executor ID":"3","Host":"node0998.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":2,"Index":2,"Attempt":0,"Launch Time":1524182130350,"Executor ID":"4","Host":"node4243.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":3,"Index":3,"Attempt":0,"Launch Time":1524182130350,"Executor ID":"1","Host":"node1404.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":1,"Stage Attempt ID":0,"Task Info":{"Task ID":4,"Index":0,"Attempt":0,"Launch Time":1524182142251,"Executor ID":"1","Host":"node1404.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":3,"Index":3,"Attempt":0,"Launch Time":1524182130350,"Executor ID":"1","Host":"node1404.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":1524182142286,"Failed":false,"Killed":false,"Accumulables":[{"ID":7,"Name":"data size total (min, med, max)","Update":"154334487","Value":"154334486","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":16,"Name":"number of output rows","Update":"466636","Value":"466636","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":1,"Name":"number of output rows","Update":"466636","Value":"466636","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":5,"Name":"duration total (min, med, max)","Update":"19666","Value":"19665","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":59,"Name":"internal.metrics.in
 put.recordsRead","Update":466636,"Value":466636,"Internal":true,"Count Failed Values":true},{"ID":58,"Name":"internal.metrics.input.bytesRead","Update":37809697,"Value":37809697,"Internal":true,"Count Failed Values":true},{"ID":57,"Name":"internal.metrics.shuffle.write.writeTime","Update":91545212,"Value":91545212,"Internal":true,"Count Failed Values":true},{"ID":56,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":466636,"Value":466636,"Internal":true,"Count Failed Values":true},{"ID":55,"Name":"internal.metrics.shuffle.write.bytesWritten","Update":20002743,"Value":20002743,"Internal":true,"Count Failed Values":true},{"ID":43,"Name":"internal.metrics.resultSerializationTime","Update":2,"Value":2,"Internal":true,"Count Failed Values":true},{"ID":42,"Name":"internal.metrics.jvmGCTime","Update":407,"Value":407,"Internal":true,"Count Failed Values":true},{"ID":41,"Name":"internal.metrics.resultSize","Update":1856,"Value":1856,"Internal":true,"Count Failed Values":true},{"
 ID":40,"Name":"internal.metrics.executorCpuTime","Update":9020410971,"Value":9020410971,"Internal":true,"Count Failed Values":true},{"ID":39,"Name":"internal.metrics.executorRunTime","Update":11146,"Value":11146,"Internal":true,"Count Failed Values":true},{"ID":38,"Name":"internal.metrics.executorDeserializeCpuTime","Update":574344183,"Value":574344183,"Internal":true,"Count Failed Values":true},{"ID":37,"Name":"internal.metrics.executorDeserializeTime","Update":714,"Value":714,"Internal":true,"Count Failed Values":true}]},"Task Metrics":{"Executor Deserialize Time":714,"Executor Deserialize CPU Time":574344183,"Executor Run Time":11146,"Executor CPU Time":9020410971,"Result Size":1856,"JVM GC Time":407,"Result Serialization Time":2,"Memory Bytes Spilled":0,"Disk Bytes Spilled":0,"Shuffle Read Metrics":{"Remote Blocks Fetched":0,"Local Blocks Fetched":0,"Fetch Wait Time":0,"Remote Bytes Read":0,"Remote Bytes Read To Disk":0,"Local Bytes Read":0,"Total Records Read":0},"Shuffle Write
  Metrics":{"Shuffle Bytes Written":20002743,"Shuffle Write Time":91545212,"Shuffle Records Written":466636},"Input Metrics":{"Bytes Read":37809697,"Records Read":466636},"Output Metrics":{"Bytes Written":0,"Records Written":0},"Updated Blocks":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":1,"Stage Attempt ID":0,"Task Info":{"Task ID":5,"Index":1,"Attempt":0,"Launch Time":1524182142997,"Executor ID":"4","Host":"node4243.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ShuffleMapTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":2,"Index":2,"Attempt":0,"Launch Time":1524182130350,"Executor ID":"4","Host":"node4243.grid.company.com","Locality":"ANY","Speculative":false,"Getting Result Time":0,"Finish Time":1524182143009,"Failed":false,"Killed":false,"Accumulables":[{"ID":7,"Name":"data size total (min, med, max)","Update":"206421303","Value":"360755789","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":16,"Name":"number of output rows","Update":"624246","Value":"1090882","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":1,"Name":"number of output rows","Update":"624246","Value":"1090882","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":5,"Name":"duration total (min, med, max)","Update":"20604","Value":"40269","Internal":true,"Count Failed Values":true,"Metadata":"sql"},{"ID":59,"Name":"internal.metrics.
 input.recordsRead","Update":624246,"Value":1090882,"Internal":true,"Count Failed Values":true},{"ID":58,"Name":"internal.metrics.input.bytesRead","Update":50423609,"Value":88233306,"Internal":true,"Count Failed Values":true},{"ID":57,"Name":"internal.metrics.shuffle.write.writeTime","Update":104125550,"Value":195670762,"Internal":true,"Count Failed Values":true},{"ID":56,"Name":"internal.metrics.shuffle.write.recordsWritten","Update":624246,"Value":1090882,"Internal":true,"Count Failed Values":true},{"ID":55,"Name":"internal.metrics.shuffle.write.bytesWritten","Update":26424033,"Value":46426776,"Internal":true,"Count Failed Values":true},{"ID":43,"Name":"internal.metrics.resultSerializationTime","Update":1,"Value":3,"Internal":true,"Count Failed Values":tr

<TRUNCATED>

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org