You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/27 22:47:19 UTC

[spark] branch branch-3.0 updated: [SPARK-31271][UI] fix web ui for driver side SQL metrics

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7c90ec0  [SPARK-31271][UI] fix web ui for driver side SQL metrics
7c90ec0 is described below

commit 7c90ec065f81c3933eef1f0dd172f1a518b1232b
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Mar 27 15:45:35 2020 -0700

    [SPARK-31271][UI] fix web ui for driver side SQL metrics
    
    ### What changes were proposed in this pull request?
    
    In https://github.com/apache/spark/pull/23551, we changed the metrics type of driver-side SQL metrics to size/time etc. which comes with max/min/median info.
    
    This doesn't make sense for driver side SQL metrics as they have only one value. It makes the web UI hard to read:
    ![image](https://user-images.githubusercontent.com/3182036/77653892-42db9900-6fab-11ea-8e7f-92f763fa32ff.png)
    
    This PR updates the SQL metrics UI to only display max/min/median if there are more than one metrics values:
    ![image](https://user-images.githubusercontent.com/3182036/77653975-5f77d100-6fab-11ea-849e-64c935377c8e.png)
    
    ### Why are the changes needed?
    
    Makes the UI easier to read
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    manual test
    
    Closes #28037 from cloud-fan/ui.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit c4e98c065c99d2cf840e6006ee5414fbaaba9937)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/execution/metric/SQLMetrics.scala    | 60 +++++++++++-----------
 .../spark/sql/execution/ui/SparkPlanGraph.scala    |  7 ++-
 .../sql/execution/metric/SQLMetricsSuite.scala     | 33 +++++++-----
 .../sql/execution/metric/SQLMetricsTestUtils.scala | 12 ++---
 .../execution/ui/SQLAppStatusListenerSuite.scala   |  9 ++--
 5 files changed, 68 insertions(+), 53 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 1394e0f..92d2179 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -116,26 +116,23 @@ object SQLMetrics {
     // data size total (min, med, max):
     // 100GB (100MB, 1GB, 10GB)
     val acc = new SQLMetric(SIZE_METRIC, -1)
-    acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
-      countFailedValues = false)
+    acc.register(sc, name = Some(name), countFailedValues = false)
     acc
   }
 
   def createTimingMetric(sc: SparkContext, name: String): SQLMetric = {
     // The final result of this metric in physical operator UI may looks like:
-    // duration(min, med, max):
+    // duration total (min, med, max):
     // 5s (800ms, 1s, 2s)
     val acc = new SQLMetric(TIMING_METRIC, -1)
-    acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
-      countFailedValues = false)
+    acc.register(sc, name = Some(name), countFailedValues = false)
     acc
   }
 
   def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = {
     // Same with createTimingMetric, just normalize the unit of time to millisecond.
     val acc = new SQLMetric(NS_TIMING_METRIC, -1)
-    acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
-      countFailedValues = false)
+    acc.register(sc, name = Some(name), countFailedValues = false)
     acc
   }
 
@@ -150,8 +147,7 @@ object SQLMetrics {
     // probe avg (min, med, max):
     // (1.2, 2.2, 6.3)
     val acc = new SQLMetric(AVERAGE_METRIC)
-    acc.register(sc, name = Some(s"$name (min, med, max (stageId: taskId))"),
-      countFailedValues = false)
+    acc.register(sc, name = Some(name), countFailedValues = false)
     acc
   }
 
@@ -164,13 +160,15 @@ object SQLMetrics {
     metricsType != SUM_METRIC
   }
 
+  private val METRICS_NAME_SUFFIX = "(min, med, max (stageId: taskId))"
+
   /**
    * A function that defines how we aggregate the final accumulator results among all tasks,
    * and represent it in string for a SQL physical operator.
     */
   def stringValue(metricsType: String, values: Array[Long], maxMetrics: Array[Long]): String = {
-    // stringMetric = "(driver)" OR (stage ${stageId}.${attemptId}: task $taskId)
-    val stringMetric = if (maxMetrics.isEmpty) {
+    // taskInfo = "(driver)" OR (stage ${stageId}.${attemptId}: task $taskId)
+    val taskInfo = if (maxMetrics.isEmpty) {
       "(driver)"
     } else {
       s"(stage ${maxMetrics(1)}.${maxMetrics(2)}: task ${maxMetrics(3)})"
@@ -180,18 +178,20 @@ object SQLMetrics {
       numberFormat.format(values.sum)
     } else if (metricsType == AVERAGE_METRIC) {
       val validValues = values.filter(_ > 0)
-      val Seq(min, med, max) = {
-        val metric = if (validValues.isEmpty) {
-          val zeros = Seq.fill(3)(0L)
-          zeros.map(v => toNumberFormat(v))
-        } else {
+      // When there are only 1 metrics value (or None), no need to display max/min/median. This is
+      // common for driver-side SQL metrics.
+      if (validValues.length <= 1) {
+        toNumberFormat(validValues.headOption.getOrElse(0))
+      } else {
+        val Seq(min, med, max) = {
           Arrays.sort(validValues)
-          Seq(toNumberFormat(validValues(0)), toNumberFormat(validValues(validValues.length / 2)),
-            s"${toNumberFormat(validValues(validValues.length - 1))} $stringMetric")
+          Seq(
+            toNumberFormat(validValues(0)),
+            toNumberFormat(validValues(validValues.length / 2)),
+            toNumberFormat(validValues(validValues.length - 1)))
         }
-        metric
+        s"$METRICS_NAME_SUFFIX:\n($min, $med, $max $taskInfo)"
       }
-      s"\n($min, $med, $max)"
     } else {
       val strFormat: Long => String = if (metricsType == SIZE_METRIC) {
         Utils.bytesToString
@@ -204,19 +204,21 @@ object SQLMetrics {
       }
 
       val validValues = values.filter(_ >= 0)
-      val Seq(sum, min, med, max) = {
-        val metric = if (validValues.isEmpty) {
-          val zeros = Seq.fill(4)(0L)
-          zeros.map(v => strFormat(v))
-        } else {
+      // When there are only 1 metrics value (or None), no need to display max/min/median. This is
+      // common for driver-side SQL metrics.
+      if (validValues.length <= 1) {
+        strFormat(validValues.headOption.getOrElse(0))
+      } else {
+        val Seq(sum, min, med, max) = {
           Arrays.sort(validValues)
-          Seq(strFormat(validValues.sum), strFormat(validValues(0)),
+          Seq(
+            strFormat(validValues.sum),
+            strFormat(validValues(0)),
             strFormat(validValues(validValues.length / 2)),
-            s"${strFormat(validValues(validValues.length - 1))} $stringMetric")
+            strFormat(validValues(validValues.length - 1)))
         }
-        metric
+        s"total $METRICS_NAME_SUFFIX\n$sum ($min, $med, $max $taskInfo)"
       }
-      s"\n$sum ($min, $med, $max)"
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 6762802..274a5a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -166,7 +166,12 @@ private[ui] class SparkPlanGraphNode(
       metric <- metrics
       value <- metricsValue.get(metric.accumulatorId)
     } yield {
-      metric.name + ": " + value
+      // The value may contain ":" to extend the name, like `total (min, med, max): ...`
+      if (value.contains(":")) {
+        metric.name + " " + value
+      } else {
+        metric.name + ": " + value
+      }
     }
 
     if (values.nonEmpty) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 11f93c8..a5b07d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -98,7 +98,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
     val ds = spark.range(10).filter('id < 5)
     testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map(
       0L -> (("WholeStageCodegen (1)", Map(
-        "duration total (min, med, max (stageId: taskId))" -> {
+        "duration" -> {
           _.toString.matches(timingMetricPattern)
         })))), true)
   }
@@ -110,10 +110,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
     val df = testData2.groupBy().count() // 2 partitions
     val expected1 = Seq(
       Map("number of output rows" -> 2L,
-        "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
+        "avg hash probe bucket list iters" ->
           aggregateMetricsPattern),
       Map("number of output rows" -> 1L,
-        "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
+        "avg hash probe bucket list iters" ->
           aggregateMetricsPattern))
     val shuffleExpected1 = Map(
       "records read" -> 2L,
@@ -130,10 +130,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
     val df2 = testData2.groupBy('a).count()
     val expected2 = Seq(
       Map("number of output rows" -> 4L,
-        "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
+        "avg hash probe bucket list iters" ->
           aggregateMetricsPattern),
       Map("number of output rows" -> 3L,
-        "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
+        "avg hash probe bucket list iters" ->
           aggregateMetricsPattern))
 
     val shuffleExpected2 = Map(
@@ -181,12 +181,17 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
       }
       val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
       nodeIds.foreach { nodeId =>
-        val probes =
-          metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId: taskId))")
-        // Extract min, med, max from the string and strip off everthing else.
-        val index = probes.toString.stripPrefix("\n(").stripSuffix(")").indexOf(" (", 0)
-        probes.toString.stripPrefix("\n(").stripSuffix(")").slice(0, index).split(", ").foreach {
-          probe => assert(probe.toDouble > 1.0)
+        val probes = metrics(nodeId)._2("avg hash probe bucket list iters").toString
+        if (!probes.contains("\n")) {
+          // It's a single metrics value
+          assert(probes.toDouble > 1.0)
+        } else {
+          val mainValue = probes.split("\n").apply(1).stripPrefix("(").stripSuffix(")")
+          // Extract min, med, max from the string and strip off everthing else.
+          val index = mainValue.indexOf(" (", 0)
+          mainValue.slice(0, index).split(", ").foreach {
+            probe => assert(probe.toDouble > 1.0)
+          }
         }
       }
     }
@@ -231,13 +236,13 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
     val df = Seq(1, 3, 2).toDF("id").sort('id)
     testSparkPlanMetricsWithPredicates(df, 2, Map(
       0L -> (("Sort", Map(
-        "sort time total (min, med, max (stageId: taskId))" -> {
+        "sort time" -> {
           _.toString.matches(timingMetricPattern)
         },
-        "peak memory total (min, med, max (stageId: taskId))" -> {
+        "peak memory" -> {
           _.toString.matches(sizeMetricPattern)
         },
-        "spill size total (min, med, max (stageId: taskId))" -> {
+        "spill size" -> {
           _.toString.matches(sizeMetricPattern)
         })))
     ))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 766e7a9..2977b53 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -46,23 +46,23 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
   protected val sizeMetricPattern = {
     val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)"
     val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
-    s"\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\)"
+    s"(.*\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\))|($bytes)"
   }
 
   // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms (stage 3.0):
-  // task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
+  // task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)" OR "1.0 ms"
   protected val timingMetricPattern = {
     val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)"
     val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
-    s"\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\)"
+    s"(.*\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\))|($duration)"
   }
 
   // Pattern of size SQLMetric value for Aggregate tests.
-  // e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )"
+  // e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )" OR "1"
   protected val aggregateMetricsPattern = {
     val iters = "([0-9]+(\\.[0-9]+)?)"
     val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
-    s"\\n\\($iters, $iters, $iters( $maxMetrics)?\\)"
+    s"(.*\\n\\($iters, $iters, $iters( $maxMetrics)?\\))|($iters)"
   }
 
   /**
@@ -98,7 +98,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
     }
 
     val totalNumBytesMetric = executedNode.metrics.find(
-      _.name == "written output total (min, med, max (stageId: taskId))").get
+      _.name == "written output").get
     val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "")
       .split(" ").head.trim.toDouble
     assert(totalNumBytes > 0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 9f4a335..949924e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -152,11 +152,14 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
       expected.foreach { case (id, value) =>
         // The values in actual can be SQL metrics meaning that they contain additional formatting
         // when converted to string. Verify that they start with the expected value.
-        // TODO: this is brittle. There is no requirement that the actual string needs to start
-        // with the accumulator value.
         assert(actual.contains(id))
         val v = actual(id).trim
-        assert(v.startsWith(value.toString), s"Wrong value for accumulator $id")
+        if (v.contains("\n")) {
+          // The actual value can be "total (max, ...)\n6 ms (5 ms, ...)".
+          assert(v.split("\n")(1).startsWith(value.toString), s"Wrong value for accumulator $id")
+        } else {
+          assert(v.startsWith(value.toString), s"Wrong value for accumulator $id")
+        }
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org