You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2018/12/04 19:01:05 UTC

spark git commit: [SPARK-26119][CORE][WEBUI] Task summary table should contain only successful tasks' metrics

Repository: spark
Updated Branches:
  refs/heads/master 556d83e0d -> 35f9163ad


[SPARK-26119][CORE][WEBUI] Task summary table should contain only successful tasks' metrics

## What changes were proposed in this pull request?

Task summary table in the stage page currently displays the summary of all the tasks. However, we should display the task summary of only successful tasks, to follow the behavior of previous versions of spark.

## How was this patch tested?
Added UT. attached screenshot
Before patch:
![screenshot from 2018-11-20 00-36-18](https://user-images.githubusercontent.com/23054875/48729339-62e3a580-ec5d-11e8-81f0-0d191a234ffe.png)

![screenshot from 2018-11-20 01-18-37](https://user-images.githubusercontent.com/23054875/48731112-41d18380-ec62-11e8-8c31-1ffbfa04e746.png)

Closes #23088 from shahidki31/summaryMetrics.

Authored-by: Shahid <sh...@gmail.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/35f9163a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/35f9163a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/35f9163a

Branch: refs/heads/master
Commit: 35f9163adf5c067229afbe57ed60d5dd5f2422c8
Parents: 556d83e
Author: Shahid <sh...@gmail.com>
Authored: Tue Dec 4 11:00:58 2018 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Dec 4 11:00:58 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/status/AppStatusStore.scala    | 73 ++++++++++++++------
 .../spark/status/AppStatusStoreSuite.scala      | 33 ++++++++-
 2 files changed, 81 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/35f9163a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index 5c0ed4d..b35781c 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -148,11 +148,20 @@ private[spark] class AppStatusStore(
     // cheaper for disk stores (avoids deserialization).
     val count = {
       Utils.tryWithResource(
-        store.view(classOf[TaskDataWrapper])
-          .parent(stageKey)
-          .index(TaskIndexNames.EXEC_RUN_TIME)
-          .first(0L)
-          .closeableIterator()
+        if (store.isInstanceOf[InMemoryStore]) {
+          store.view(classOf[TaskDataWrapper])
+            .parent(stageKey)
+            .index(TaskIndexNames.STATUS)
+            .first("SUCCESS")
+            .last("SUCCESS")
+            .closeableIterator()
+        } else {
+          store.view(classOf[TaskDataWrapper])
+            .parent(stageKey)
+            .index(TaskIndexNames.EXEC_RUN_TIME)
+            .first(0L)
+            .closeableIterator()
+        }
       ) { it =>
         var _count = 0L
         while (it.hasNext()) {
@@ -221,30 +230,50 @@ private[spark] class AppStatusStore(
     // stabilize once the stage finishes. It's also slow, especially with disk stores.
     val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }
 
+    // TODO: Summary metrics needs to display all the successful tasks' metrics (SPARK-26119).
+    // For InMemory case, it is efficient to find using the following code. But for diskStore case
+    // we need an efficient solution to avoid deserialization time overhead. For that, we need to
+    // rework on the way indexing works, so that we can index by specific metrics for successful
+    // and failed tasks differently (would be tricky). Also would require changing the disk store
+    // version (to invalidate old stores).
     def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
-      Utils.tryWithResource(
-        store.view(classOf[TaskDataWrapper])
+      if (store.isInstanceOf[InMemoryStore]) {
+        val quantileTasks = store.view(classOf[TaskDataWrapper])
           .parent(stageKey)
           .index(index)
           .first(0L)
-          .closeableIterator()
-      ) { it =>
-        var last = Double.NaN
-        var currentIdx = -1L
-        indices.map { idx =>
-          if (idx == currentIdx) {
-            last
-          } else {
-            val diff = idx - currentIdx
-            currentIdx = idx
-            if (it.skip(diff - 1)) {
-              last = fn(it.next()).toDouble
+          .asScala
+          .filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks
+          .toIndexedSeq
+
+        indices.map { index =>
+          fn(quantileTasks(index.toInt)).toDouble
+        }.toIndexedSeq
+      } else {
+        Utils.tryWithResource(
+          store.view(classOf[TaskDataWrapper])
+            .parent(stageKey)
+            .index(index)
+            .first(0L)
+            .closeableIterator()
+        ) { it =>
+          var last = Double.NaN
+          var currentIdx = -1L
+          indices.map { idx =>
+            if (idx == currentIdx) {
               last
             } else {
-              Double.NaN
+              val diff = idx - currentIdx
+              currentIdx = idx
+              if (it.skip(diff - 1)) {
+                last = fn(it.next()).toDouble
+                last
+              } else {
+                Double.NaN
+              }
             }
-          }
-        }.toIndexedSeq
+          }.toIndexedSeq
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/35f9163a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
index 92f90f3..75a6581 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -77,6 +77,34 @@ class AppStatusStoreSuite extends SparkFunSuite {
     assert(store.count(classOf[CachedQuantile]) === 2)
   }
 
+  test("only successfull task have taskSummary") {
+    val store = new InMemoryStore()
+    (0 until 5).foreach { i => store.write(newTaskData(i, status = "FAILED")) }
+    val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles)
+    assert(appStore.size === 0)
+  }
+
+  test("summary should contain task metrics of only successfull tasks") {
+    val store = new InMemoryStore()
+
+    for (i <- 0 to 5) {
+      if (i % 2 == 1) {
+        store.write(newTaskData(i, status = "FAILED"))
+      } else {
+        store.write(newTaskData(i))
+      }
+    }
+
+    val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get
+
+    val values = Array(0.0, 2.0, 4.0)
+
+    val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted)
+    dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
+      assert(expected === actual)
+    }
+  }
+
   private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = {
     val store = new InMemoryStore()
     val values = (0 until count).map { i =>
@@ -93,12 +121,11 @@ class AppStatusStoreSuite extends SparkFunSuite {
     }
   }
 
-  private def newTaskData(i: Int): TaskDataWrapper = {
+  private def newTaskData(i: Int, status: String = "SUCCESS"): TaskDataWrapper = {
     new TaskDataWrapper(
-      i, i, i, i, i, i, i.toString, i.toString, i.toString, i.toString, false, Nil, None,
+      i, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None,
       i, i, i, i, i, i, i, i, i, i,
       i, i, i, i, i, i, i, i, i, i,
       i, i, i, i, stageId, attemptId)
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org