You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/03/27 22:47:19 UTC
[spark] branch branch-3.0 updated: [SPARK-31271][UI] fix web ui for
driver side SQL metrics
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 7c90ec0 [SPARK-31271][UI] fix web ui for driver side SQL metrics
7c90ec0 is described below
commit 7c90ec065f81c3933eef1f0dd172f1a518b1232b
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Mar 27 15:45:35 2020 -0700
[SPARK-31271][UI] fix web ui for driver side SQL metrics
### What changes were proposed in this pull request?
In https://github.com/apache/spark/pull/23551, we changed the metrics type of driver-side SQL metrics to size/time etc. which comes with max/min/median info.
This doesn't make sense for driver side SQL metrics as they have only one value. It makes the web UI hard to read:
![image](https://user-images.githubusercontent.com/3182036/77653892-42db9900-6fab-11ea-8e7f-92f763fa32ff.png)
This PR updates the SQL metrics UI to only display max/min/median if there are more than one metrics values:
![image](https://user-images.githubusercontent.com/3182036/77653975-5f77d100-6fab-11ea-849e-64c935377c8e.png)
### Why are the changes needed?
Makes the UI easier to read
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
manual test
Closes #28037 from cloud-fan/ui.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit c4e98c065c99d2cf840e6006ee5414fbaaba9937)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/sql/execution/metric/SQLMetrics.scala | 60 +++++++++++-----------
.../spark/sql/execution/ui/SparkPlanGraph.scala | 7 ++-
.../sql/execution/metric/SQLMetricsSuite.scala | 33 +++++++-----
.../sql/execution/metric/SQLMetricsTestUtils.scala | 12 ++---
.../execution/ui/SQLAppStatusListenerSuite.scala | 9 ++--
5 files changed, 68 insertions(+), 53 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 1394e0f..92d2179 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -116,26 +116,23 @@ object SQLMetrics {
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
val acc = new SQLMetric(SIZE_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
- countFailedValues = false)
+ acc.register(sc, name = Some(name), countFailedValues = false)
acc
}
def createTimingMetric(sc: SparkContext, name: String): SQLMetric = {
// The final result of this metric in physical operator UI may looks like:
- // duration(min, med, max):
+ // duration total (min, med, max):
// 5s (800ms, 1s, 2s)
val acc = new SQLMetric(TIMING_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
- countFailedValues = false)
+ acc.register(sc, name = Some(name), countFailedValues = false)
acc
}
def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = {
// Same with createTimingMetric, just normalize the unit of time to millisecond.
val acc = new SQLMetric(NS_TIMING_METRIC, -1)
- acc.register(sc, name = Some(s"$name total (min, med, max (stageId: taskId))"),
- countFailedValues = false)
+ acc.register(sc, name = Some(name), countFailedValues = false)
acc
}
@@ -150,8 +147,7 @@ object SQLMetrics {
// probe avg (min, med, max):
// (1.2, 2.2, 6.3)
val acc = new SQLMetric(AVERAGE_METRIC)
- acc.register(sc, name = Some(s"$name (min, med, max (stageId: taskId))"),
- countFailedValues = false)
+ acc.register(sc, name = Some(name), countFailedValues = false)
acc
}
@@ -164,13 +160,15 @@ object SQLMetrics {
metricsType != SUM_METRIC
}
+ private val METRICS_NAME_SUFFIX = "(min, med, max (stageId: taskId))"
+
/**
* A function that defines how we aggregate the final accumulator results among all tasks,
* and represent it in string for a SQL physical operator.
*/
def stringValue(metricsType: String, values: Array[Long], maxMetrics: Array[Long]): String = {
- // stringMetric = "(driver)" OR (stage ${stageId}.${attemptId}: task $taskId)
- val stringMetric = if (maxMetrics.isEmpty) {
+ // taskInfo = "(driver)" OR (stage ${stageId}.${attemptId}: task $taskId)
+ val taskInfo = if (maxMetrics.isEmpty) {
"(driver)"
} else {
s"(stage ${maxMetrics(1)}.${maxMetrics(2)}: task ${maxMetrics(3)})"
@@ -180,18 +178,20 @@ object SQLMetrics {
numberFormat.format(values.sum)
} else if (metricsType == AVERAGE_METRIC) {
val validValues = values.filter(_ > 0)
- val Seq(min, med, max) = {
- val metric = if (validValues.isEmpty) {
- val zeros = Seq.fill(3)(0L)
- zeros.map(v => toNumberFormat(v))
- } else {
+ // When there are only 1 metrics value (or None), no need to display max/min/median. This is
+ // common for driver-side SQL metrics.
+ if (validValues.length <= 1) {
+ toNumberFormat(validValues.headOption.getOrElse(0))
+ } else {
+ val Seq(min, med, max) = {
Arrays.sort(validValues)
- Seq(toNumberFormat(validValues(0)), toNumberFormat(validValues(validValues.length / 2)),
- s"${toNumberFormat(validValues(validValues.length - 1))} $stringMetric")
+ Seq(
+ toNumberFormat(validValues(0)),
+ toNumberFormat(validValues(validValues.length / 2)),
+ toNumberFormat(validValues(validValues.length - 1)))
}
- metric
+ s"$METRICS_NAME_SUFFIX:\n($min, $med, $max $taskInfo)"
}
- s"\n($min, $med, $max)"
} else {
val strFormat: Long => String = if (metricsType == SIZE_METRIC) {
Utils.bytesToString
@@ -204,19 +204,21 @@ object SQLMetrics {
}
val validValues = values.filter(_ >= 0)
- val Seq(sum, min, med, max) = {
- val metric = if (validValues.isEmpty) {
- val zeros = Seq.fill(4)(0L)
- zeros.map(v => strFormat(v))
- } else {
+ // When there are only 1 metrics value (or None), no need to display max/min/median. This is
+ // common for driver-side SQL metrics.
+ if (validValues.length <= 1) {
+ strFormat(validValues.headOption.getOrElse(0))
+ } else {
+ val Seq(sum, min, med, max) = {
Arrays.sort(validValues)
- Seq(strFormat(validValues.sum), strFormat(validValues(0)),
+ Seq(
+ strFormat(validValues.sum),
+ strFormat(validValues(0)),
strFormat(validValues(validValues.length / 2)),
- s"${strFormat(validValues(validValues.length - 1))} $stringMetric")
+ strFormat(validValues(validValues.length - 1)))
}
- metric
+ s"total $METRICS_NAME_SUFFIX\n$sum ($min, $med, $max $taskInfo)"
}
- s"\n$sum ($min, $med, $max)"
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 6762802..274a5a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -166,7 +166,12 @@ private[ui] class SparkPlanGraphNode(
metric <- metrics
value <- metricsValue.get(metric.accumulatorId)
} yield {
- metric.name + ": " + value
+ // The value may contain ":" to extend the name, like `total (min, med, max): ...`
+ if (value.contains(":")) {
+ metric.name + " " + value
+ } else {
+ metric.name + ": " + value
+ }
}
if (values.nonEmpty) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 11f93c8..a5b07d5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -98,7 +98,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val ds = spark.range(10).filter('id < 5)
testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map(
0L -> (("WholeStageCodegen (1)", Map(
- "duration total (min, med, max (stageId: taskId))" -> {
+ "duration" -> {
_.toString.matches(timingMetricPattern)
})))), true)
}
@@ -110,10 +110,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df = testData2.groupBy().count() // 2 partitions
val expected1 = Seq(
Map("number of output rows" -> 2L,
- "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
+ "avg hash probe bucket list iters" ->
aggregateMetricsPattern),
Map("number of output rows" -> 1L,
- "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
+ "avg hash probe bucket list iters" ->
aggregateMetricsPattern))
val shuffleExpected1 = Map(
"records read" -> 2L,
@@ -130,10 +130,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df2 = testData2.groupBy('a).count()
val expected2 = Seq(
Map("number of output rows" -> 4L,
- "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
+ "avg hash probe bucket list iters" ->
aggregateMetricsPattern),
Map("number of output rows" -> 3L,
- "avg hash probe bucket list iters (min, med, max (stageId: taskId))" ->
+ "avg hash probe bucket list iters" ->
aggregateMetricsPattern))
val shuffleExpected2 = Map(
@@ -181,12 +181,17 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
}
val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
nodeIds.foreach { nodeId =>
- val probes =
- metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max (stageId: taskId))")
- // Extract min, med, max from the string and strip off everthing else.
- val index = probes.toString.stripPrefix("\n(").stripSuffix(")").indexOf(" (", 0)
- probes.toString.stripPrefix("\n(").stripSuffix(")").slice(0, index).split(", ").foreach {
- probe => assert(probe.toDouble > 1.0)
+ val probes = metrics(nodeId)._2("avg hash probe bucket list iters").toString
+ if (!probes.contains("\n")) {
+ // It's a single metrics value
+ assert(probes.toDouble > 1.0)
+ } else {
+ val mainValue = probes.split("\n").apply(1).stripPrefix("(").stripSuffix(")")
+ // Extract min, med, max from the string and strip off everthing else.
+ val index = mainValue.indexOf(" (", 0)
+ mainValue.slice(0, index).split(", ").foreach {
+ probe => assert(probe.toDouble > 1.0)
+ }
}
}
}
@@ -231,13 +236,13 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
val df = Seq(1, 3, 2).toDF("id").sort('id)
testSparkPlanMetricsWithPredicates(df, 2, Map(
0L -> (("Sort", Map(
- "sort time total (min, med, max (stageId: taskId))" -> {
+ "sort time" -> {
_.toString.matches(timingMetricPattern)
},
- "peak memory total (min, med, max (stageId: taskId))" -> {
+ "peak memory" -> {
_.toString.matches(sizeMetricPattern)
},
- "spill size total (min, med, max (stageId: taskId))" -> {
+ "spill size" -> {
_.toString.matches(sizeMetricPattern)
})))
))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 766e7a9..2977b53 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -46,23 +46,23 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
protected val sizeMetricPattern = {
val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)"
val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
- s"\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\)"
+ s"(.*\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\))|($bytes)"
}
// Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms (stage 3.0):
- // task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
+ // task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)" OR "1.0 ms"
protected val timingMetricPattern = {
val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)"
val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
- s"\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\)"
+ s"(.*\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\))|($duration)"
}
// Pattern of size SQLMetric value for Aggregate tests.
- // e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )"
+ // e.g "\n(1, 1, 0.9 (stage 1.0: task 8)) OR "\n(1, 1, 0.9 )" OR "1"
protected val aggregateMetricsPattern = {
val iters = "([0-9]+(\\.[0-9]+)?)"
val maxMetrics = "\\(stage ([0-9])+\\.([0-9])+\\: task ([0-9])+\\)"
- s"\\n\\($iters, $iters, $iters( $maxMetrics)?\\)"
+ s"(.*\\n\\($iters, $iters, $iters( $maxMetrics)?\\))|($iters)"
}
/**
@@ -98,7 +98,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
}
val totalNumBytesMetric = executedNode.metrics.find(
- _.name == "written output total (min, med, max (stageId: taskId))").get
+ _.name == "written output").get
val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "")
.split(" ").head.trim.toDouble
assert(totalNumBytes > 0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 9f4a335..949924e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -152,11 +152,14 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
expected.foreach { case (id, value) =>
// The values in actual can be SQL metrics meaning that they contain additional formatting
// when converted to string. Verify that they start with the expected value.
- // TODO: this is brittle. There is no requirement that the actual string needs to start
- // with the accumulator value.
assert(actual.contains(id))
val v = actual(id).trim
- assert(v.startsWith(value.toString), s"Wrong value for accumulator $id")
+ if (v.contains("\n")) {
+ // The actual value can be "total (max, ...)\n6 ms (5 ms, ...)".
+ assert(v.split("\n")(1).startsWith(value.toString), s"Wrong value for accumulator $id")
+ } else {
+ assert(v.startsWith(value.toString), s"Wrong value for accumulator $id")
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org