You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sa...@apache.org on 2021/10/01 08:00:27 UTC

[spark] branch master updated: [SPARK-36038][CORE] Speculation metrics summary at stage level

This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 73747ec  [SPARK-36038][CORE] Speculation metrics summary at stage level
73747ec is described below

commit 73747ecb970595d49c478b0eb65f5132c8b0bf02
Author: Venkata krishnan Sowrirajan <vs...@linkedin.com>
AuthorDate: Fri Oct 1 16:59:29 2021 +0900

    [SPARK-36038][CORE] Speculation metrics summary at stage level
    
    ### What changes were proposed in this pull request?
    
    Currently there are no speculation metrics available for Spark either at application/job/stage level. This PR is to add some basic speculation metrics for a stage when speculation execution is enabled.
    
    This is similar to the existing stage level metrics tracking numTotal (total number of speculated tasks), numCompleted (total number of successful speculated tasks), numFailed (total number of failed speculated tasks), numKilled (total number of killed speculated tasks) etc.
    
    With this new set of metrics, it helps further understanding speculative execution feature in the context of the application and also helps in further tuning the speculative execution config knobs.
    
    Screenshot of Spark UI with speculation summary:
    ![Screen Shot 2021-09-22 at 12 12 20 PM](https://user-images.githubusercontent.com/8871522/135321311-db7699ad-f1ae-4729-afea-d1e2c4e86103.png)
    
    Screenshot of Spark UI with API output:
    ![Screen Shot 2021-09-22 at 12 10 37 PM](https://user-images.githubusercontent.com/8871522/135321486-4dbb7a67-5580-47f8-bccf-81c758c2e988.png)
    
    ### Why are the changes needed?
    
    Additional metrics for speculative execution.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit tests added and also deployed in our internal platform for quite some time now.
    
    Lead-authored by: Venkata krishnan Sowrirajan <vsowrirajanlinkedin.com>
    Co-authored by: Ron Hu <rhulinkedin.com>
    Co-authored by: Thejdeep Gudivada <tgudivadalinkedin.com>
    
    Closes #33253 from venkata91/speculation-metrics.
    
    Authored-by: Venkata krishnan Sowrirajan <vs...@linkedin.com>
    Signed-off-by: Kousuke Saruta <sa...@oss.nttdata.com>
---
 .../org/apache/spark/ui/static/stagepage.js        |  29 ++
 .../spark/ui/static/stagespage-template.html       |  15 +
 .../resources/org/apache/spark/ui/static/webui.css |  10 +
 .../apache/spark/status/AppStatusListener.scala    |  19 +
 .../org/apache/spark/status/AppStatusStore.scala   |  11 +
 .../scala/org/apache/spark/status/LiveEntity.scala |  26 ++
 .../scala/org/apache/spark/status/api/v1/api.scala |   8 +
 .../scala/org/apache/spark/status/storeTypes.scala |  14 +
 .../scala/org/apache/spark/ui/jobs/JobPage.scala   |   1 +
 .../application_list_json_expectation.json         |  15 +
 .../completed_app_list_json_expectation.json       |  15 +
 .../limit_app_list_json_expectation.json           |  30 +-
 .../minDate_app_list_json_expectation.json         |  15 +
 .../minEndDate_app_list_json_expectation.json      |  15 +
 ...stage_with_speculation_summary_expectation.json | 507 +++++++++++++++++++++
 .../spark-events/application_1628109047826_1317105 |  52 +++
 .../spark/deploy/history/HistoryServerSuite.scala  |   5 +-
 .../spark/status/AppStatusListenerSuite.scala      |  10 +
 .../apache/spark/status/AppStatusStoreSuite.scala  |  57 ++-
 .../scala/org/apache/spark/ui/StagePageSuite.scala |   1 +
 dev/.rat-excludes                                  |   3 +-
 21 files changed, 840 insertions(+), 18 deletions(-)

diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js
index db1a148..595635a 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/stagepage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/stagepage.js
@@ -652,6 +652,35 @@ $(document).ready(function () {
             executorSummaryTableSelector.column(14).visible(dataToShow.showBytesSpilledData);
           });
 
+        // Prepare data for speculation metrics
+        $("#speculationSummaryTitle").hide()
+        $("#speculationSummary").hide()
+        var speculationSummaryInfo = responseBody.speculationSummary;
+        var speculationData = [[
+          speculationSummaryInfo.numTasks,
+          speculationSummaryInfo.numActiveTasks,
+          speculationSummaryInfo.numCompletedTasks,
+          speculationSummaryInfo.numFailedTasks,
+          speculationSummaryInfo.numKilledTasks
+        ]];
+        if (speculationSummaryInfo.numTasks > 0) {
+          // Show speculationSummary if there is atleast one speculated task ran
+          $("#speculationSummaryTitle").show()
+          $("#speculationSummary").show()
+        }
+        var speculationMetricsTableConf = {
+          "data": speculationData,
+          "paging": false,
+          "searching": false,
+          "order": [[0, "asc"]],
+          "bSort": false,
+          "bAutoWidth": false,
+          "oLanguage": {
+            "sEmptyTable": "No speculation metrics yet"
+          }
+        }
+        $("#speculation-metrics-table").DataTable(speculationMetricsTableConf);
+
         // prepare data for accumulatorUpdates
         var accumulatorTable = responseBody.accumulatorUpdates.filter(accumUpdate =>
           !(accumUpdate.name).toString().includes("internal."));
diff --git a/core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html b/core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html
index 98e714f..8c47e5a 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html
+++ b/core/src/main/resources/org/apache/spark/ui/static/stagespage-template.html
@@ -31,6 +31,21 @@ limitations under the License.
             </tbody>
         </table>
     </div>
+    <h4 id="speculationSummaryTitle" class="title-table">Speculation Summary</h4>
+    <div id="speculationSummary" class="container-fluid">
+        <table id="speculation-metrics-table" class="table table-striped compact table-dataTable cell-border">
+            <thead>
+            <th>Total</th>
+            <th>Active</th>
+            <th>Complete</th>
+            <th>Failed</th>
+            <th>Killed</th>
+            </thead>
+            <tbody>
+            </tbody>
+        </table>
+        </div>
+    </div>
     <h4 id="aggregatedMetrics" class="collapse-table">
         <span class="expand-input-rate-arrow arrow-closed" id="arrowtoggle2"></span>
         <a class="title-table">Aggregated Metrics by Executor</a>
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index fad28ce..5ea9b78 100755
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -383,6 +383,16 @@ a.expandbutton {
   border-left: 1px solid #dddddd;
 }
 
+#speculation-metrics-table th {
+  border-top: 1px solid #dddddd;
+  border-bottom: 1px solid #dddddd;
+  border-right: 1px solid #dddddd;
+}
+
+#speculation-metrics-table th:first-child {
+  border-left: 1px solid #dddddd;
+}
+
 #summary-metrics-table th {
   border-top: 1px solid #dddddd;
   border-bottom: 1px solid #dddddd;
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index f9aaa7f..a76f0cc 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -600,6 +600,12 @@ private[spark] class AppStatusListener(
     liveUpdate(task, now)
 
     Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
+      if (event.taskInfo.speculative) {
+        stage.speculationStageSummary.numActiveTasks += 1
+        stage.speculationStageSummary.numTasks += 1
+      }
+      maybeUpdate(stage.speculationStageSummary, now)
+
       stage.activeTasks += 1
       stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime)
 
@@ -747,6 +753,19 @@ private[spark] class AppStatusListener(
         maybeUpdate(esummary, now)
       }
 
+      val speculationStageSummary = stage.speculationStageSummary
+      if (event.taskInfo.speculative) {
+        speculationStageSummary.numActiveTasks -= 1
+        speculationStageSummary.numCompletedTasks += completedDelta
+        speculationStageSummary.numFailedTasks += failedDelta
+        speculationStageSummary.numKilledTasks += killedDelta
+      }
+      if (isLastTask && event.taskInfo.speculative) {
+        update(speculationStageSummary, now)
+      } else {
+        maybeUpdate(speculationStageSummary, now)
+      }
+
       if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
         stage.cleaning = true
         kvstore.doAsync {
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index a8a16cd..6ba5a21 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -471,6 +471,11 @@ private[spark] class AppStatusStore(
       .asScala.map { exec => (exec.executorId -> exec.info) }.toMap
   }
 
+  def speculationSummary(stageId: Int, attemptId: Int): Option[v1.SpeculationStageSummary] = {
+    val stageKey = Array(stageId, attemptId)
+    asOption(store.read(classOf[SpeculationStageSummaryWrapper], stageKey).info)
+  }
+
   def rddList(cachedOnly: Boolean = true): Seq[v1.RDDStorageInfo] = {
     store.view(classOf[RDDStorageInfoWrapper]).asScala.map(_.info).filter { rdd =>
       !cachedOnly || rdd.numCachedPartitions > 0
@@ -524,6 +529,11 @@ private[spark] class AppStatusStore(
         } else {
           None
         }
+      val speculationStageSummary: Option[v1.SpeculationStageSummary] = if (withDetail) {
+        speculationSummary(stage.stageId, stage.attemptId)
+      } else {
+        None
+      }
 
       new v1.StageData(
         status = stage.status,
@@ -572,6 +582,7 @@ private[spark] class AppStatusStore(
         accumulatorUpdates = stage.accumulatorUpdates,
         tasks = tasks,
         executorSummary = executorSummaries,
+        speculationSummary = speculationStageSummary,
         killedTasksSummary = stage.killedTasksSummary,
         resourceProfileId = stage.resourceProfileId,
         peakExecutorMetrics = stage.peakExecutorMetrics,
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index b3dc367..b5c7375 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -392,6 +392,28 @@ private class LiveExecutorStageSummary(
 
 }
 
+private class LiveSpeculationStageSummary(
+    stageId: Int,
+    attemptId: Int) extends LiveEntity {
+
+  var numTasks = 0
+  var numActiveTasks = 0
+  var numCompletedTasks = 0
+  var numFailedTasks = 0
+  var numKilledTasks = 0
+
+  override protected def doUpdate(): Any = {
+    val info = new v1.SpeculationStageSummary(
+      numTasks,
+      numActiveTasks,
+      numCompletedTasks,
+      numFailedTasks,
+      numKilledTasks
+    )
+    new SpeculationStageSummaryWrapper(stageId, attemptId, info)
+  }
+}
+
 private class LiveStage(var info: StageInfo) extends LiveEntity {
 
   import LiveEntityHelpers._
@@ -426,6 +448,9 @@ private class LiveStage(var info: StageInfo) extends LiveEntity {
 
   val peakExecutorMetrics = new ExecutorMetrics()
 
+  lazy val speculationStageSummary: LiveSpeculationStageSummary =
+    new LiveSpeculationStageSummary(info.stageId, info.attemptNumber)
+
   // Used for cleanup of tasks after they reach the configured limit. Not written to the store.
   @volatile var cleaning = false
   val savedTasks = new AtomicInteger(0)
@@ -489,6 +514,7 @@ private class LiveStage(var info: StageInfo) extends LiveEntity {
       accumulatorUpdates = newAccumulatorInfos(info.accumulables.values),
       tasks = None,
       executorSummary = None,
+      speculationSummary = None,
       killedTasksSummary = killedSummary,
       resourceProfileId = info.resourceProfileId,
       peakExecutorMetrics = Some(peakExecutorMetrics).filter(_.isSet),
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 8c08232..86ddd3b 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -89,6 +89,13 @@ class ExecutorStageSummary private[spark](
     val peakMemoryMetrics: Option[ExecutorMetrics],
     val isExcludedForStage: Boolean)
 
+class SpeculationStageSummary private[spark](
+   val numTasks: Int,
+   val numActiveTasks: Int,
+   val numCompletedTasks: Int,
+   val numFailedTasks: Int,
+   val numKilledTasks: Int)
+
 class ExecutorSummary private[spark](
     val id: String,
     val hostPort: String,
@@ -288,6 +295,7 @@ class StageData private[spark](
     val accumulatorUpdates: Seq[AccumulableInfo],
     val tasks: Option[Map[Long, TaskData]],
     val executorSummary: Option[Map[String, ExecutorStageSummary]],
+    val speculationSummary: Option[SpeculationStageSummary],
     val killedTasksSummary: Map[String, Int],
     val resourceProfileId: Int,
     @JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index b258bdb..b7b81e0 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -399,6 +399,20 @@ private[spark] class ExecutorStageSummaryWrapper(
 
 }
 
+private[spark] class SpeculationStageSummaryWrapper(
+    val stageId: Int,
+    val stageAttemptId: Int,
+    val info: SpeculationStageSummary) {
+
+  @JsonIgnore @KVIndex
+  private val _id: Array[Int] = Array(stageId, stageAttemptId)
+
+  @JsonIgnore @KVIndex("stage")
+  private def stage: Array[Int] = Array(stageId, stageAttemptId)
+
+  private[this] val id: Array[Int] = _id
+}
+
 private[spark] class StreamBlockData(
   val name: String,
   val executorId: String,
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 08e8652..1de000b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -288,6 +288,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
           accumulatorUpdates = Nil,
           tasks = None,
           executorSummary = None,
+          speculationSummary = None,
           killedTasksSummary = Map(),
           ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID,
           peakExecutorMetrics = None,
diff --git a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
index 06015ec..ae1edbc 100644
--- a/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/application_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1628109047826_1317105",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2021-08-10T23:24:55.333GMT",
+    "endTime" : "2021-08-10T23:29:30.208GMT",
+    "lastUpdated" : "",
+    "duration" : 274875,
+    "sparkUser" : "john",
+    "completed" : true,
+    "appSparkVersion" : "3.1.1.119",
+    "startTimeEpoch" : 1628637895333,
+    "endTimeEpoch" : 1628638170208,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "app-20200706201101-0003",
   "name" : "Spark shell",
   "attempts" : [ {
diff --git a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
index 06015ec..ae1edbc 100644
--- a/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/completed_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1628109047826_1317105",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2021-08-10T23:24:55.333GMT",
+    "endTime" : "2021-08-10T23:29:30.208GMT",
+    "lastUpdated" : "",
+    "duration" : 274875,
+    "sparkUser" : "john",
+    "completed" : true,
+    "appSparkVersion" : "3.1.1.119",
+    "startTimeEpoch" : 1628637895333,
+    "endTimeEpoch" : 1628638170208,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "app-20200706201101-0003",
   "name" : "Spark shell",
   "attempts" : [ {
diff --git a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
index 8e6be68..91e3ebd 100644
--- a/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/limit_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1628109047826_1317105",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2021-08-10T23:24:55.333GMT",
+    "endTime" : "2021-08-10T23:29:30.208GMT",
+    "lastUpdated" : "",
+    "duration" : 274875,
+    "sparkUser" : "john",
+    "completed" : true,
+    "appSparkVersion" : "3.1.1.119",
+    "startTimeEpoch" : 1628637895333,
+    "endTimeEpoch" : 1628638170208,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "app-20200706201101-0003",
   "name" : "Spark shell",
   "attempts" : [ {
@@ -28,19 +43,4 @@
     "startTimeEpoch" : 1578764662851,
     "lastUpdatedEpoch" : 0
   } ]
-}, {
-  "id" : "application_1555004656427_0144",
-  "name" : "Spark shell",
-  "attempts" : [ {
-    "startTime" : "2019-07-02T21:02:17.180GMT",
-    "endTime" : "2019-07-02T21:02:35.974GMT",
-    "lastUpdated" : "",
-    "duration" : 18794,
-    "sparkUser" : "tgraves",
-    "completed" : true,
-    "appSparkVersion" : "3.0.0-SNAPSHOT",
-    "startTimeEpoch" : 1562101337180,
-    "lastUpdatedEpoch" : 0,
-    "endTimeEpoch" : 1562101355974
-  } ]
 } ]
diff --git a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
index 35d71f9..9885d36 100644
--- a/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minDate_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1628109047826_1317105",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2021-08-10T23:24:55.333GMT",
+    "endTime" : "2021-08-10T23:29:30.208GMT",
+    "lastUpdated" : "",
+    "duration" : 274875,
+    "sparkUser" : "john",
+    "completed" : true,
+    "appSparkVersion" : "3.1.1.119",
+    "startTimeEpoch" : 1628637895333,
+    "endTimeEpoch" : 1628638170208,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "app-20200706201101-0003",
   "name" : "Spark shell",
   "attempts" : [ {
diff --git a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
index c6530b1..cf0bad7 100644
--- a/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/minEndDate_app_list_json_expectation.json
@@ -1,4 +1,19 @@
 [ {
+  "id" : "application_1628109047826_1317105",
+  "name" : "Spark shell",
+  "attempts" : [ {
+    "startTime" : "2021-08-10T23:24:55.333GMT",
+    "endTime" : "2021-08-10T23:29:30.208GMT",
+    "lastUpdated" : "",
+    "duration" : 274875,
+    "sparkUser" : "john",
+    "completed" : true,
+    "appSparkVersion" : "3.1.1.119",
+    "startTimeEpoch" : 1628637895333,
+    "endTimeEpoch" : 1628638170208,
+    "lastUpdatedEpoch" : 0
+  } ]
+}, {
   "id" : "app-20200706201101-0003",
   "name" : "Spark shell",
   "attempts" : [ {
diff --git a/core/src/test/resources/HistoryServerExpectations/stage_with_speculation_summary_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_with_speculation_summary_expectation.json
new file mode 100644
index 0000000..5f6090d
--- /dev/null
+++ b/core/src/test/resources/HistoryServerExpectations/stage_with_speculation_summary_expectation.json
@@ -0,0 +1,507 @@
+{
+  "status" : "COMPLETE",
+  "stageId" : 0,
+  "attemptId" : 0,
+  "numTasks" : 4,
+  "numActiveTasks" : 0,
+  "numCompleteTasks" : 4,
+  "numFailedTasks" : 0,
+  "numKilledTasks" : 1,
+  "numCompletedIndices" : 4,
+  "submissionTime" : "2021-08-10T23:27:53.488GMT",
+  "firstTaskLaunchedTime" : "2021-08-10T23:27:53.885GMT",
+  "completionTime" : "2021-08-10T23:28:57.679GMT",
+  "executorDeserializeTime" : 12793,
+  "executorDeserializeCpuTime" : 5317155711,
+  "executorRunTime" : 113648,
+  "executorCpuTime" : 284330976,
+  "resultSize" : 3360,
+  "jvmGcTime" : 0,
+  "resultSerializationTime" : 4,
+  "memoryBytesSpilled" : 0,
+  "diskBytesSpilled" : 0,
+  "peakExecutionMemory" : 0,
+  "inputBytes" : 0,
+  "inputRecords" : 0,
+  "outputBytes" : 0,
+  "outputRecords" : 0,
+  "shuffleRemoteBlocksFetched" : 0,
+  "shuffleLocalBlocksFetched" : 0,
+  "shuffleFetchWaitTime" : 0,
+  "shuffleRemoteBytesRead" : 0,
+  "shuffleRemoteBytesReadToDisk" : 0,
+  "shuffleLocalBytesRead" : 0,
+  "shuffleReadBytes" : 0,
+  "shuffleReadRecords" : 0,
+  "shuffleWriteBytes" : 0,
+  "shuffleWriteTime" : 0,
+  "shuffleWriteRecords" : 0,
+  "name" : "collect at <console>:27",
+  "details" : "org.apache.spark.rdd.RDD.collect(RDD.scala:1029)\n$line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:27)\n$line17.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:36)\n$line17.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:38)\n$line17.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:40)\n$line17.$read$$iw$$iw$$iw$$iw.<init>(<console>:42)\n$line17.$read$$iw$$iw$$iw.<init>(<console>:44)\n$line17.$read$$iw$$iw.<init>(<console>:46)\n$line17.$read$$iw.<init>(<console>: [...]
+  "schedulingPool" : "default",
+  "rddIds" : [ 1, 0 ],
+  "accumulatorUpdates" : [ ],
+  "tasks" : {
+    "0" : {
+      "taskId" : 0,
+      "index" : 0,
+      "attempt" : 0,
+      "launchTime" : "2021-08-10T23:27:53.885GMT",
+      "duration" : 2234,
+      "executorId" : "7",
+      "host" : "host-12291",
+      "status" : "SUCCESS",
+      "taskLocality" : "PROCESS_LOCAL",
+      "speculative" : false,
+      "accumulatorUpdates" : [ ],
+      "taskMetrics" : {
+        "executorDeserializeTime" : 2048,
+        "executorDeserializeCpuTime" : 1171756284,
+        "executorRunTime" : 74,
+        "executorCpuTime" : 65263482,
+        "resultSize" : 840,
+        "jvmGcTime" : 0,
+        "resultSerializationTime" : 1,
+        "memoryBytesSpilled" : 0,
+        "diskBytesSpilled" : 0,
+        "peakExecutionMemory" : 0,
+        "inputMetrics" : {
+          "bytesRead" : 0,
+          "recordsRead" : 0
+        },
+        "outputMetrics" : {
+          "bytesWritten" : 0,
+          "recordsWritten" : 0
+        },
+        "shuffleReadMetrics" : {
+          "remoteBlocksFetched" : 0,
+          "localBlocksFetched" : 0,
+          "fetchWaitTime" : 0,
+          "remoteBytesRead" : 0,
+          "remoteBytesReadToDisk" : 0,
+          "localBytesRead" : 0,
+          "recordsRead" : 0
+        },
+        "shuffleWriteMetrics" : {
+          "bytesWritten" : 0,
+          "writeTime" : 0,
+          "recordsWritten" : 0
+        }
+      },
+      "executorLogs" : {
+        "stdout" : "http://host-12291:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000008/john/stdout?start=-4096",
+        "stderr" : "http://host-12291:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000008/john/stderr?start=-4096"
+      },
+      "schedulerDelay" : 111,
+      "gettingResultTime" : 0
+    },
+    "1" : {
+      "taskId" : 1,
+      "index" : 1,
+      "attempt" : 0,
+      "launchTime" : "2021-08-10T23:27:53.903GMT",
+      "duration" : 2647,
+      "executorId" : "5",
+      "host" : "host-5290",
+      "status" : "SUCCESS",
+      "taskLocality" : "PROCESS_LOCAL",
+      "speculative" : false,
+      "accumulatorUpdates" : [ ],
+      "taskMetrics" : {
+        "executorDeserializeTime" : 2474,
+        "executorDeserializeCpuTime" : 1460022429,
+        "executorRunTime" : 83,
+        "executorCpuTime" : 76317261,
+        "resultSize" : 840,
+        "jvmGcTime" : 0,
+        "resultSerializationTime" : 1,
+        "memoryBytesSpilled" : 0,
+        "diskBytesSpilled" : 0,
+        "peakExecutionMemory" : 0,
+        "inputMetrics" : {
+          "bytesRead" : 0,
+          "recordsRead" : 0
+        },
+        "outputMetrics" : {
+          "bytesWritten" : 0,
+          "recordsWritten" : 0
+        },
+        "shuffleReadMetrics" : {
+          "remoteBlocksFetched" : 0,
+          "localBlocksFetched" : 0,
+          "fetchWaitTime" : 0,
+          "remoteBytesRead" : 0,
+          "remoteBytesReadToDisk" : 0,
+          "localBytesRead" : 0,
+          "recordsRead" : 0
+        },
+        "shuffleWriteMetrics" : {
+          "bytesWritten" : 0,
+          "writeTime" : 0,
+          "recordsWritten" : 0
+        }
+      },
+      "executorLogs" : {
+        "stdout" : "http://host-5290:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000006/john/stdout?start=-4096",
+        "stderr" : "http://host-5290:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000006/john/stderr?start=-4096"
+      },
+      "schedulerDelay" : 89,
+      "gettingResultTime" : 0
+    },
+    "2" : {
+      "taskId" : 2,
+      "index" : 2,
+      "attempt" : 0,
+      "launchTime" : "2021-08-10T23:27:53.904GMT",
+      "duration" : 5124,
+      "executorId" : "8",
+      "host" : "host-25261",
+      "status" : "SUCCESS",
+      "taskLocality" : "PROCESS_LOCAL",
+      "speculative" : false,
+      "accumulatorUpdates" : [ ],
+      "taskMetrics" : {
+        "executorDeserializeTime" : 4731,
+        "executorDeserializeCpuTime" : 1363180019,
+        "executorRunTime" : 172,
+        "executorCpuTime" : 76094097,
+        "resultSize" : 840,
+        "jvmGcTime" : 0,
+        "resultSerializationTime" : 1,
+        "memoryBytesSpilled" : 0,
+        "diskBytesSpilled" : 0,
+        "peakExecutionMemory" : 0,
+        "inputMetrics" : {
+          "bytesRead" : 0,
+          "recordsRead" : 0
+        },
+        "outputMetrics" : {
+          "bytesWritten" : 0,
+          "recordsWritten" : 0
+        },
+        "shuffleReadMetrics" : {
+          "remoteBlocksFetched" : 0,
+          "localBlocksFetched" : 0,
+          "fetchWaitTime" : 0,
+          "remoteBytesRead" : 0,
+          "remoteBytesReadToDisk" : 0,
+          "localBytesRead" : 0,
+          "recordsRead" : 0
+        },
+        "shuffleWriteMetrics" : {
+          "bytesWritten" : 0,
+          "writeTime" : 0,
+          "recordsWritten" : 0
+        }
+      },
+      "executorLogs" : {
+        "stdout" : "http://host-25261:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000009/john/stdout?start=-4096",
+        "stderr" : "http://host-25261:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000009/john/stderr?start=-4096"
+      },
+      "schedulerDelay" : 220,
+      "gettingResultTime" : 0
+    },
+    "3" : {
+      "taskId" : 3,
+      "index" : 3,
+      "attempt" : 0,
+      "launchTime" : "2021-08-10T23:27:53.904GMT",
+      "duration" : 63773,
+      "executorId" : "10",
+      "host" : "host-12413",
+      "status" : "SUCCESS",
+      "taskLocality" : "PROCESS_LOCAL",
+      "speculative" : false,
+      "accumulatorUpdates" : [ ],
+      "taskMetrics" : {
+        "executorDeserializeTime" : 3540,
+        "executorDeserializeCpuTime" : 1322196979,
+        "executorRunTime" : 60141,
+        "executorCpuTime" : 66656136,
+        "resultSize" : 840,
+        "jvmGcTime" : 0,
+        "resultSerializationTime" : 1,
+        "memoryBytesSpilled" : 0,
+        "diskBytesSpilled" : 0,
+        "peakExecutionMemory" : 0,
+        "inputMetrics" : {
+          "bytesRead" : 0,
+          "recordsRead" : 0
+        },
+        "outputMetrics" : {
+          "bytesWritten" : 0,
+          "recordsWritten" : 0
+        },
+        "shuffleReadMetrics" : {
+          "remoteBlocksFetched" : 0,
+          "localBlocksFetched" : 0,
+          "fetchWaitTime" : 0,
+          "remoteBytesRead" : 0,
+          "remoteBytesReadToDisk" : 0,
+          "localBytesRead" : 0,
+          "recordsRead" : 0
+        },
+        "shuffleWriteMetrics" : {
+          "bytesWritten" : 0,
+          "writeTime" : 0,
+          "recordsWritten" : 0
+        }
+      },
+      "executorLogs" : {
+        "stdout" : "http://host-12413:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000011/john/stdout?start=-4096",
+        "stderr" : "http://host-12413:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000011/john/stderr?start=-4096"
+      },
+      "schedulerDelay" : 91,
+      "gettingResultTime" : 0
+    },
+    "4" : {
+      "taskId" : 4,
+      "index" : 3,
+      "attempt" : 1,
+      "launchTime" : "2021-08-10T23:28:04.499GMT",
+      "duration" : 53201,
+      "executorId" : "7",
+      "host" : "host-12291",
+      "status" : "KILLED",
+      "taskLocality" : "PROCESS_LOCAL",
+      "speculative" : true,
+      "accumulatorUpdates" : [ ],
+      "errorMessage" : "another attempt succeeded",
+      "taskMetrics" : {
+        "executorDeserializeTime" : 0,
+        "executorDeserializeCpuTime" : 0,
+        "executorRunTime" : 53178,
+        "executorCpuTime" : 0,
+        "resultSize" : 0,
+        "jvmGcTime" : 0,
+        "resultSerializationTime" : 0,
+        "memoryBytesSpilled" : 0,
+        "diskBytesSpilled" : 0,
+        "peakExecutionMemory" : 0,
+        "inputMetrics" : {
+          "bytesRead" : 0,
+          "recordsRead" : 0
+        },
+        "outputMetrics" : {
+          "bytesWritten" : 0,
+          "recordsWritten" : 0
+        },
+        "shuffleReadMetrics" : {
+          "remoteBlocksFetched" : 0,
+          "localBlocksFetched" : 0,
+          "fetchWaitTime" : 0,
+          "remoteBytesRead" : 0,
+          "remoteBytesReadToDisk" : 0,
+          "localBytesRead" : 0,
+          "recordsRead" : 0
+        },
+        "shuffleWriteMetrics" : {
+          "bytesWritten" : 0,
+          "writeTime" : 0,
+          "recordsWritten" : 0
+        }
+      },
+      "executorLogs" : {
+        "stdout" : "http://host-12291:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000008/john/stdout?start=-4096",
+        "stderr" : "http://host-12291:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000008/john/stderr?start=-4096"
+      },
+      "schedulerDelay" : 23,
+      "gettingResultTime" : 0
+    }
+  },
+  "executorSummary" : {
+    "10" : {
+      "taskTime" : 63773,
+      "failedTasks" : 0,
+      "succeededTasks" : 1,
+      "killedTasks" : 0,
+      "inputBytes" : 0,
+      "inputRecords" : 0,
+      "outputBytes" : 0,
+      "outputRecords" : 0,
+      "shuffleRead" : 0,
+      "shuffleReadRecords" : 0,
+      "shuffleWrite" : 0,
+      "shuffleWriteRecords" : 0,
+      "memoryBytesSpilled" : 0,
+      "diskBytesSpilled" : 0,
+      "isBlacklistedForStage" : false,
+      "peakMemoryMetrics" : {
+        "JVMHeapMemory" : 229801896,
+        "JVMOffHeapMemory" : 63976024,
+        "OnHeapExecutionMemory" : 0,
+        "OffHeapExecutionMemory" : 0,
+        "OnHeapStorageMemory" : 4960,
+        "OffHeapStorageMemory" : 0,
+        "OnHeapUnifiedMemory" : 4960,
+        "OffHeapUnifiedMemory" : 0,
+        "DirectPoolMemory" : 8195,
+        "MappedPoolMemory" : 0,
+        "ProcessTreeJVMVMemory" : 0,
+        "ProcessTreeJVMRSSMemory" : 0,
+        "ProcessTreePythonVMemory" : 0,
+        "ProcessTreePythonRSSMemory" : 0,
+        "ProcessTreeOtherVMemory" : 0,
+        "ProcessTreeOtherRSSMemory" : 0,
+        "MinorGCCount" : 2,
+        "MinorGCTime" : 280,
+        "MajorGCCount" : 2,
+        "MajorGCTime" : 1116
+      },
+      "isExcludedForStage" : false
+    },
+    "5" : {
+      "taskTime" : 2647,
+      "failedTasks" : 0,
+      "succeededTasks" : 1,
+      "killedTasks" : 0,
+      "inputBytes" : 0,
+      "inputRecords" : 0,
+      "outputBytes" : 0,
+      "outputRecords" : 0,
+      "shuffleRead" : 0,
+      "shuffleReadRecords" : 0,
+      "shuffleWrite" : 0,
+      "shuffleWriteRecords" : 0,
+      "memoryBytesSpilled" : 0,
+      "diskBytesSpilled" : 0,
+      "isBlacklistedForStage" : false,
+      "peakMemoryMetrics" : {
+        "JVMHeapMemory" : 0,
+        "JVMOffHeapMemory" : 0,
+        "OnHeapExecutionMemory" : 0,
+        "OffHeapExecutionMemory" : 0,
+        "OnHeapStorageMemory" : 0,
+        "OffHeapStorageMemory" : 0,
+        "OnHeapUnifiedMemory" : 0,
+        "OffHeapUnifiedMemory" : 0,
+        "DirectPoolMemory" : 0,
+        "MappedPoolMemory" : 0,
+        "ProcessTreeJVMVMemory" : 0,
+        "ProcessTreeJVMRSSMemory" : 0,
+        "ProcessTreePythonVMemory" : 0,
+        "ProcessTreePythonRSSMemory" : 0,
+        "ProcessTreeOtherVMemory" : 0,
+        "ProcessTreeOtherRSSMemory" : 0,
+        "MinorGCCount" : 0,
+        "MinorGCTime" : 0,
+        "MajorGCCount" : 0,
+        "MajorGCTime" : 0
+      },
+      "isExcludedForStage" : false
+    },
+    "7" : {
+      "taskTime" : 55435,
+      "failedTasks" : 0,
+      "succeededTasks" : 1,
+      "killedTasks" : 1,
+      "inputBytes" : 0,
+      "inputRecords" : 0,
+      "outputBytes" : 0,
+      "outputRecords" : 0,
+      "shuffleRead" : 0,
+      "shuffleReadRecords" : 0,
+      "shuffleWrite" : 0,
+      "shuffleWriteRecords" : 0,
+      "memoryBytesSpilled" : 0,
+      "diskBytesSpilled" : 0,
+      "isBlacklistedForStage" : false,
+      "peakMemoryMetrics" : {
+        "JVMHeapMemory" : 321591096,
+        "JVMOffHeapMemory" : 66626584,
+        "OnHeapExecutionMemory" : 0,
+        "OffHeapExecutionMemory" : 0,
+        "OnHeapStorageMemory" : 4960,
+        "OffHeapStorageMemory" : 0,
+        "OnHeapUnifiedMemory" : 4960,
+        "OffHeapUnifiedMemory" : 0,
+        "DirectPoolMemory" : 10093,
+        "MappedPoolMemory" : 0,
+        "ProcessTreeJVMVMemory" : 0,
+        "ProcessTreeJVMRSSMemory" : 0,
+        "ProcessTreePythonVMemory" : 0,
+        "ProcessTreePythonRSSMemory" : 0,
+        "ProcessTreeOtherVMemory" : 0,
+        "ProcessTreeOtherRSSMemory" : 0,
+        "MinorGCCount" : 2,
+        "MinorGCTime" : 587,
+        "MajorGCCount" : 2,
+        "MajorGCTime" : 906
+      },
+      "isExcludedForStage" : false
+    },
+    "8" : {
+      "taskTime" : 5124,
+      "failedTasks" : 0,
+      "succeededTasks" : 1,
+      "killedTasks" : 0,
+      "inputBytes" : 0,
+      "inputRecords" : 0,
+      "outputBytes" : 0,
+      "outputRecords" : 0,
+      "shuffleRead" : 0,
+      "shuffleReadRecords" : 0,
+      "shuffleWrite" : 0,
+      "shuffleWriteRecords" : 0,
+      "memoryBytesSpilled" : 0,
+      "diskBytesSpilled" : 0,
+      "isBlacklistedForStage" : false,
+      "peakMemoryMetrics" : {
+        "JVMHeapMemory" : 0,
+        "JVMOffHeapMemory" : 0,
+        "OnHeapExecutionMemory" : 0,
+        "OffHeapExecutionMemory" : 0,
+        "OnHeapStorageMemory" : 0,
+        "OffHeapStorageMemory" : 0,
+        "OnHeapUnifiedMemory" : 0,
+        "OffHeapUnifiedMemory" : 0,
+        "DirectPoolMemory" : 0,
+        "MappedPoolMemory" : 0,
+        "ProcessTreeJVMVMemory" : 0,
+        "ProcessTreeJVMRSSMemory" : 0,
+        "ProcessTreePythonVMemory" : 0,
+        "ProcessTreePythonRSSMemory" : 0,
+        "ProcessTreeOtherVMemory" : 0,
+        "ProcessTreeOtherRSSMemory" : 0,
+        "MinorGCCount" : 0,
+        "MinorGCTime" : 0,
+        "MajorGCCount" : 0,
+        "MajorGCTime" : 0
+      },
+      "isExcludedForStage" : false
+    }
+  },
+  "speculationSummary" : {
+    "numTasks" : 1,
+    "numActiveTasks" : 0,
+    "numCompletedTasks" : 0,
+    "numFailedTasks" : 0,
+    "numKilledTasks" : 1
+  },
+  "killedTasksSummary" : {
+    "another attempt succeeded" : 1
+  },
+  "resourceProfileId" : 0,
+  "peakExecutorMetrics" : {
+    "JVMHeapMemory" : 321591096,
+    "JVMOffHeapMemory" : 66626584,
+    "OnHeapExecutionMemory" : 0,
+    "OffHeapExecutionMemory" : 0,
+    "OnHeapStorageMemory" : 4960,
+    "OffHeapStorageMemory" : 0,
+    "OnHeapUnifiedMemory" : 4960,
+    "OffHeapUnifiedMemory" : 0,
+    "DirectPoolMemory" : 10093,
+    "MappedPoolMemory" : 0,
+    "ProcessTreeJVMVMemory" : 0,
+    "ProcessTreeJVMRSSMemory" : 0,
+    "ProcessTreePythonVMemory" : 0,
+    "ProcessTreePythonRSSMemory" : 0,
+    "ProcessTreeOtherVMemory" : 0,
+    "ProcessTreeOtherRSSMemory" : 0,
+    "MinorGCCount" : 2,
+    "MinorGCTime" : 587,
+    "MajorGCCount" : 2,
+    "MajorGCTime" : 1116
+  }
+}
\ No newline at end of file
diff --git a/core/src/test/resources/spark-events/application_1628109047826_1317105 b/core/src/test/resources/spark-events/application_1628109047826_1317105
new file mode 100644
index 0000000..0ecd50e
--- /dev/null
+++ b/core/src/test/resources/spark-events/application_1628109047826_1317105
@@ -0,0 +1,52 @@
+{"Event":"SparkListenerLogStart","Spark Version":"3.1.1.119"}
+{"Event":"SparkListenerResourceProfileAdded","Resource Profile Id":0,"Executor Resource Requests":{"cores":{"Resource Name":"cores","Amount":1,"Discovery Script":"","Vendor":""},"memory":{"Resource Name":"memory","Amount":4096,"Discovery Script":"","Vendor":""},"offHeap":{"Resource Name":"offHeap","Amount":0,"Discovery Script":"","Vendor":""}},"Task Resource Requests":{"cpus":{"Resource Name":"cpus","Amount":1.0}}}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"host-6467","Port":22948},"Maximum Memory":2101975449,"Timestamp":1628637929509,"Maximum Onheap Memory":2101975449,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerEnvironmentUpdate","JVM Information":{"Java Home":"/export/apps/jdk/JDK-1_8_0_172/jre","Java Version":"1.8.0_172 (Oracle Corporation)","Scala Version":"version 2.12.10"},"Spark Properties":{"spark.jars.ivySettings":"/export/apps/spark/commonconf/ivysettings.xml","spark.yarn.dist.archives":"hdfs:/share/lib/v1/spark/spark-conf-3.1.1.1176-hadooplibs-.tar.gz#__hadoop-site-libs__,hdfs:/share/lib/v1/spark/hive-libjars-1.1.0.232.tar.gz#__hive-bin__","spark.serializer":"or [...]
+{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"application_1628109047826_1317105","Timestamp":1628637895333,"User":"john"}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1628637966675,"Executor ID":"1","Executor Info":{"Host":"host-25253","Total Cores":1,"Log Urls":{"stdout":"http://host-25253:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000002/john/stdout?start=-4096","stderr":"http://host-25253:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000002/john/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host-25253:8042","USER":"john","LOG_FILES":"stderr,stdout","NM_HT [...]
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"1","Host":"host-25253","Port":33279},"Maximum Memory":2101975449,"Timestamp":1628637967153,"Maximum Onheap Memory":2101975449,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1628637968142,"Executor ID":"10","Executor Info":{"Host":"host-12413","Total Cores":1,"Log Urls":{"stdout":"http://host-12413:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000011/john/stdout?start=-4096","stderr":"http://host-12413:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000011/john/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host-12413:8042","USER":"john","LOG_FILES":"stderr,stdout","NM_H [...]
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"10","Host":"host-12413","Port":15049},"Maximum Memory":2101975449,"Timestamp":1628637968579,"Maximum Onheap Memory":2101975449,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1628637974303,"Executor ID":"9","Executor Info":{"Host":"host-11741","Total Cores":1,"Log Urls":{"stdout":"http://host-11741:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000010/john/stdout?start=-4096","stderr":"http://host-11741:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000010/john/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host-11741:8042","USER":"john","LOG_FILES":"stderr,stdout","NM_HT [...]
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"9","Host":"host-11741","Port":26410},"Maximum Memory":2101975449,"Timestamp":1628637974894,"Maximum Onheap Memory":2101975449,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1628637975085,"Executor ID":"7","Executor Info":{"Host":"host-12291","Total Cores":1,"Log Urls":{"stdout":"http://host-12291:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000008/john/stdout?start=-4096","stderr":"http://host-12291:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000008/john/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host-12291:8042","USER":"john","LOG_FILES":"stderr,stdout","NM_HT [...]
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"7","Host":"host-12291","Port":29976},"Maximum Memory":2101975449,"Timestamp":1628637975365,"Maximum Onheap Memory":2101975449,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1628637975767,"Executor ID":"8","Executor Info":{"Host":"host-25261","Total Cores":1,"Log Urls":{"stdout":"http://host-25261:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000009/john/stdout?start=-4096","stderr":"http://host-25261:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000009/john/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host-25261:8042","USER":"john","LOG_FILES":"stderr,stdout","NM_HT [...]
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"8","Host":"host-25261","Port":5372},"Maximum Memory":2101975449,"Timestamp":1628637976198,"Maximum Onheap Memory":2101975449,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1628637976791,"Executor ID":"2","Executor Info":{"Host":"host-12328","Total Cores":1,"Log Urls":{"stdout":"http://host-12328:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000003/john/stdout?start=-4096","stderr":"http://host-12328:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000003/john/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host-12328:8042","USER":"john","LOG_FILES":"stderr,stdout","NM_HT [...]
+{"Event":"SparkListenerExecutorAdded","Timestamp":1628637977242,"Executor ID":"3","Executor Info":{"Host":"host-5467","Total Cores":1,"Log Urls":{"stdout":"http://host-5467:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000004/john/stdout?start=-4096","stderr":"http://host-5467:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000004/john/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host-5467:8042","USER":"john","LOG_FILES":"stderr,stdout","NM_HTTP_P [...]
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"2","Host":"host-12328","Port":31993},"Maximum Memory":2101975449,"Timestamp":1628637977299,"Maximum Onheap Memory":2101975449,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"3","Host":"host-5467","Port":32829},"Maximum Memory":2101975449,"Timestamp":1628637977984,"Maximum Onheap Memory":2101975449,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1628637980554,"Executor ID":"6","Executor Info":{"Host":"host-4628","Total Cores":1,"Log Urls":{"stdout":"http://host-4628:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000007/john/stdout?start=-4096","stderr":"http://host-4628:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000007/john/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host-4628:8042","USER":"john","LOG_FILES":"stderr,stdout","NM_HTTP_P [...]
+{"Event":"SparkListenerExecutorAdded","Timestamp":1628637980871,"Executor ID":"5","Executor Info":{"Host":"host-5290","Total Cores":1,"Log Urls":{"stdout":"http://host-5290:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000006/john/stdout?start=-4096","stderr":"http://host-5290:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000006/john/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host-5290:8042","USER":"john","LOG_FILES":"stderr,stdout","NM_HTTP_P [...]
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"6","Host":"host-4628","Port":16175},"Maximum Memory":2101975449,"Timestamp":1628637981331,"Maximum Onheap Memory":2101975449,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"5","Host":"host-5290","Port":14783},"Maximum Memory":2101975449,"Timestamp":1628637981501,"Maximum Onheap Memory":2101975449,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerExecutorAdded","Timestamp":1628637985360,"Executor ID":"4","Executor Info":{"Host":"host-4592","Total Cores":1,"Log Urls":{"stdout":"http://host-4592:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000005/john/stdout?start=-4096","stderr":"http://host-4592:8042/node/containerlogs/container_e18_1628109047826_1317105_01_000005/john/stderr?start=-4096"},"Attributes":{"NM_HTTP_ADDRESS":"host-4592:8042","USER":"john","LOG_FILES":"stderr,stdout","NM_HTTP_P [...]
+{"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"4","Host":"host-4592","Port":12238},"Maximum Memory":2101975449,"Timestamp":1628637986178,"Maximum Onheap Memory":2101975449,"Maximum Offheap Memory":0}
+{"Event":"SparkListenerJobStart","Job ID":0,"Submission Time":1628638073466,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"collect at <console>:27","Number of Tasks":4,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:27","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":4,"Number of Cached Partitions":0," [...]
+{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"collect at <console>:27","Number of Tasks":4,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:27","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID" [...]
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1628638073885,"Executor ID":"7","Host":"host-12291","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1628638073903,"Executor ID":"5","Host":"host-5290","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":2,"Index":2,"Attempt":0,"Launch Time":1628638073904,"Executor ID":"8","Host":"host-25261","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":3,"Index":3,"Attempt":0,"Launch Time":1628638073904,"Executor ID":"10","Host":"host-12413","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1628638073885,"Executor ID":"7","Host":"host-12291","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1628638076119,"Failed":false,"Killed":false,"Accumulables":[{"ID":0,"Name":"internal.metrics.executorDeserializeTime","Update":2048,"Value":2048,"Internal":true,"Co [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":1,"Index":1,"Attempt":0,"Launch Time":1628638073903,"Executor ID":"5","Host":"host-5290","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1628638076550,"Failed":false,"Killed":false,"Accumulables":[{"ID":0,"Name":"internal.metrics.executorDeserializeTime","Update":2474,"Value":4522,"Internal":true,"Cou [...]
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":2,"Index":2,"Attempt":0,"Launch Time":1628638073904,"Executor ID":"8","Host":"host-25261","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1628638079028,"Failed":false,"Killed":false,"Accumulables":[{"ID":0,"Name":"internal.metrics.executorDeserializeTime","Update":4731,"Value":9253,"Internal":true,"Co [...]
+{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":4,"Index":3,"Attempt":1,"Launch Time":1628638084499,"Executor ID":"7","Host":"host-12291","Locality":"PROCESS_LOCAL","Speculative":true,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}
+{"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"1","Host":"host-25253","Port":33279},"Timestamp":1628638118870}
+{"Event":"SparkListenerExecutorRemoved","Timestamp":1628638118895,"Executor ID":"1","Removed Reason":"Container container_e18_1628109047826_1317105_01_000002 exited from explicit termination request."}
+{"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"9","Host":"host-11741","Port":26410},"Timestamp":1628638124987}
+{"Event":"SparkListenerExecutorRemoved","Timestamp":1628638124991,"Executor ID":"9","Removed Reason":"Container container_e18_1628109047826_1317105_01_000010 exited from explicit termination request."}
+{"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"2","Host":"host-12328","Port":31993},"Timestamp":1628638129508}
+{"Event":"SparkListenerExecutorRemoved","Timestamp":1628638129514,"Executor ID":"2","Removed Reason":"Container container_e18_1628109047826_1317105_01_000003 exited from explicit termination request."}
+{"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"3","Host":"host-5467","Port":32829},"Timestamp":1628638130967}
+{"Event":"SparkListenerExecutorRemoved","Timestamp":1628638130980,"Executor ID":"3","Removed Reason":"Container container_e18_1628109047826_1317105_01_000004 exited from explicit termination request."}
+{"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"6","Host":"host-4628","Port":16175},"Timestamp":1628638133265}
+{"Event":"SparkListenerExecutorRemoved","Timestamp":1628638133274,"Executor ID":"6","Removed Reason":"Container container_e18_1628109047826_1317105_01_000007 exited from explicit termination request."}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"Success"},"Task Info":{"Task ID":3,"Index":3,"Attempt":0,"Launch Time":1628638073904,"Executor ID":"10","Host":"host-12413","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":1628638137677,"Failed":false,"Killed":false,"Accumulables":[{"ID":0,"Name":"internal.metrics.executorDeserializeTime","Update":3540,"Value":12793,"Internal":true," [...]
+{"Event":"SparkListenerStageCompleted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"collect at <console>:27","Number of Tasks":4,"RDD Info":[{"RDD ID":1,"Name":"MapPartitionsRDD","Scope":"{\"id\":\"1\",\"name\":\"map\"}","Callsite":"map at <console>:27","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Deserialized":false,"Replication":1},"Barrier":false,"Number of Partitions":4,"Number of Cached Partitions":0,"Memory Size":0,"Disk Size":0},{"RDD ID" [...]
+{"Event":"SparkListenerJobEnd","Job ID":0,"Completion Time":1628638137687,"Job Result":{"Result":"JobSucceeded"}}
+{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"ResultTask","Task End Reason":{"Reason":"TaskKilled","Kill Reason":"another attempt succeeded","Accumulator Updates":[{"ID":2,"Update":"53178","Internal":false,"Count Failed Values":true},{"ID":4,"Update":"0","Internal":false,"Count Failed Values":true}]},"Task Info":{"Task ID":4,"Index":3,"Attempt":1,"Launch Time":1628638084499,"Executor ID":"7","Host":"host-12291","Locality":"PROCESS_LOCAL","Speculative":tru [...]
+{"Event":"SparkListenerBlockManagerRemoved","Block Manager ID":{"Executor ID":"4","Host":"host-4592","Port":12238},"Timestamp":1628638139043}
+{"Event":"SparkListenerExecutorRemoved","Timestamp":1628638139048,"Executor ID":"4","Removed Reason":"Container container_e18_1628109047826_1317105_01_000005 exited from explicit termination request."}
+{"Event":"SparkListenerApplicationEnd","Timestamp":1628638170208}
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 734ad7b..71ab9e7 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -192,7 +192,10 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
     // Enable "spark.eventLog.logBlockUpdates.enabled", to get the storage information
     // in the history server.
     "one rdd storage json" -> "applications/local-1422981780767/storage/rdd/0",
-    "miscellaneous process" -> "applications/application_1555004656427_0144/allmiscellaneousprocess"
+    "miscellaneous process" ->
+      "applications/application_1555004656427_0144/allmiscellaneousprocess",
+    "stage with speculation summary" ->
+      "applications/application_1628109047826_1317105/stages/0/0/"
   )
 
   // run a bunch of characterization tests -- just verify the behavior is the same as what is saved
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index b2d3e0f..37fd816 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -344,6 +344,11 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
       assert(task.attempt === reattempt.attemptNumber)
     }
 
+    check[SpeculationStageSummaryWrapper](key(stages.head)) { stage =>
+      assert(stage.info.numActiveTasks == 2)
+      assert(stage.info.numTasks == 2)
+    }
+
     // Kill one task, restart it.
     time += 1
     val killed = s1Tasks.drop(1).head
@@ -428,6 +433,11 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
       assert(stage.info.numCompleteTasks === pending.size)
     }
 
+    check[SpeculationStageSummaryWrapper](key(stages.head)) { stage =>
+      assert(stage.info.numCompletedTasks == 2)
+      assert(stage.info.numKilledTasks == 2)
+    }
+
     pending.foreach { task =>
       check[TaskDataWrapper](task.taskId) { wrapper =>
         assert(wrapper.errorMessage === None)
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
index 735e519..acd6c39 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -19,7 +19,10 @@ package org.apache.spark.status
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.scheduler.{TaskInfo, TaskLocality}
+import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
+import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.scheduler.{SparkListenerStageSubmitted, SparkListenerTaskStart, StageInfo, TaskInfo, TaskLocality}
+import org.apache.spark.status.api.v1.SpeculationStageSummary
 import org.apache.spark.util.{Distribution, Utils}
 import org.apache.spark.util.kvstore._
 
@@ -136,6 +139,51 @@ class AppStatusStoreSuite extends SparkFunSuite {
     }
   }
 
+  test("SPARK-36038: speculation summary") {
+    val store = new InMemoryStore()
+    store.write(newSpeculationSummaryData(stageId, attemptId))
+
+    val appStore = new AppStatusStore(store)
+    val info = appStore.speculationSummary(stageId, attemptId)
+    assert(info.isDefined)
+    info.foreach { metric =>
+      assert(metric.numTasks == 10)
+      assert(metric.numActiveTasks == 2)
+      assert(metric.numCompletedTasks == 5)
+      assert(metric.numFailedTasks == 1)
+      assert(metric.numKilledTasks == 2)
+    }
+  }
+
+  test("SPARK-36038: speculation summary without any task completed") {
+    val conf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
+    val statusStore = AppStatusStore.createLiveStore(conf)
+
+    val listener = statusStore.listener.get
+
+    // Simulate a stage in job progress listener
+    val stageInfo = new StageInfo(stageId = 0, attemptId = 0, name = "dummy", numTasks = 1,
+      rddInfos = Seq.empty, parentIds = Seq.empty, details = "details",
+      resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    (1 to 2).foreach {
+      taskId =>
+        val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY,
+          false)
+        listener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
+        listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
+    }
+
+    assert(statusStore.speculationSummary(0, 0).isDefined)
+  }
+
+  test("SPARK-36038: speculation summary for unknown stages" +
+      " like SKIPPED stages should not fail with NoSuchElementException") {
+    val conf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
+    val statusStore = AppStatusStore.createLiveStore(conf)
+
+    assert(statusStore.speculationSummary(0, 0).isEmpty)
+  }
+
   private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = {
     val store = new InMemoryStore()
     val values = (0 until count).map { i =>
@@ -204,4 +252,11 @@ class AppStatusStoreSuite extends SparkFunSuite {
     taskMetrics.shuffleWriteMetrics.incRecordsWritten(i)
     taskMetrics
   }
+
+  private def newSpeculationSummaryData(
+      stageId: Int,
+      stageAttemptId: Int): SpeculationStageSummaryWrapper = {
+    val speculationStageSummary = new SpeculationStageSummary(10, 2, 5, 1, 2)
+    new SpeculationStageSummaryWrapper(stageId, stageAttemptId, speculationStageSummary)
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 9f0b73f..31d8718 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -91,6 +91,7 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
         accumulatorUpdates = Seq(new UIAccumulableInfo(0L, "acc", None, "value")),
         tasks = None,
         executorSummary = None,
+        speculationSummary = None,
         killedTasksSummary = Map.empty,
         ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID,
         peakExecutorMetrics = None,
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index a35d4ce..8eccb89 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -125,6 +125,7 @@ application_1578436911597_0052
 config.properties
 local-1596020211915
 app-20200706201101-0003
+application_1628109047826_1317105
 py.typed
 _metadata
 _SUCCESS
@@ -135,4 +136,4 @@ over1k
 over10k
 exported_table/*
 ansible-for-test-node/*
-node_modules
\ No newline at end of file
+node_modules

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org