You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2019/01/17 18:49:58 UTC

[spark] branch master updated: [SPARK-26622][SQL] Revise SQL Metrics labels

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ede35c8  [SPARK-26622][SQL] Revise SQL Metrics labels
ede35c8 is described below

commit ede35c88e02323c69a5a6feb7e8ff4d29ffb0456
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Thu Jan 17 10:49:42 2019 -0800

    [SPARK-26622][SQL] Revise SQL Metrics labels
    
    ## What changes were proposed in this pull request?
    
    Try to make labels more obvious
    "avg hash probe"	avg hash probe bucket iterations
    "partition pruning time (ms)"	dynamic partition pruning time
    "total number of files in the table"	file count
    "number of files that would be returned by partition pruning alone"	file count after partition pruning
    "total size of files in the table"	file size
    "size of files that would be returned by partition pruning alone"	file size after partition pruning
    "metadata time (ms)"	metadata time
    "aggregate time"	time in aggregation build
    "aggregate time"	time in aggregation build
    "time to construct rdd bc"	time to build
    "total time to remove rows"	time to remove
    "total time to update rows"	time to update
    
    Add proper metric type to some metrics:
    "bytes of written output"	written output - createSizeMetric
    "metadata time"	- createTimingMetric
    "dataSize"	- createSizeMetric
    "collectTime"	- createTimingMetric
    "buildTime"	- createTimingMetric
    "broadcastTIme"	- createTimingMetric
    
    ## How is this patch tested?
    
    Existing tests.
    
    Author: Stacy Kerkela <stacy.kerkeladatabricks.com>
    Signed-off-by: Juliusz Sompolski <julekdatabricks.com>
    
    Closes #23551 from juliuszsompolski/SPARK-26622.
    
    Lead-authored-by: Juliusz Sompolski <ju...@databricks.com>
    Co-authored-by: Stacy Kerkela <st...@databricks.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../java/org/apache/spark/unsafe/map/BytesToBytesMap.java    |  2 +-
 .../spark/sql/execution/UnsafeFixedWidthAggregationMap.java  |  6 +++---
 .../org/apache/spark/sql/execution/DataSourceScanExec.scala  |  4 ++--
 .../spark/sql/execution/aggregate/HashAggregateExec.scala    |  7 ++++---
 .../sql/execution/aggregate/ObjectHashAggregateExec.scala    |  2 +-
 .../execution/aggregate/TungstenAggregationIterator.scala    |  2 +-
 .../apache/spark/sql/execution/basicPhysicalOperators.scala  |  4 ++--
 .../sql/execution/datasources/BasicWriteStatsTracker.scala   |  2 +-
 .../spark/sql/execution/exchange/BroadcastExchangeExec.scala |  8 ++++----
 .../spark/sql/execution/streaming/statefulOperators.scala    |  4 ++--
 .../apache/spark/sql/execution/metric/SQLMetricsSuite.scala  | 12 ++++++------
 .../spark/sql/execution/metric/SQLMetricsTestUtils.scala     |  6 ++++--
 12 files changed, 31 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 2ff98a6..13ca7fb 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -854,7 +854,7 @@ public final class BytesToBytesMap extends MemoryConsumer {
   /**
    * Returns the average number of probes per key lookup.
    */
-  public double getAverageProbesPerLookup() {
+  public double getAvgHashProbeBucketListIterations() {
     return (1.0 * numProbes) / numKeyLookups;
   }
 
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index 7e76a65..117e98f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -226,10 +226,10 @@ public final class UnsafeFixedWidthAggregationMap {
   }
 
   /**
-   * Gets the average hash map probe per looking up for the underlying `BytesToBytesMap`.
+   * Gets the average bucket list iterations per lookup in the underlying `BytesToBytesMap`.
    */
-  public double getAverageProbesPerLookup() {
-    return map.getAverageProbesPerLookup();
+  public double getAvgHashProbeBucketListIterations() {
+    return map.getAvgHashProbeBucketListIterations();
   }
 
   /**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index f6f3fb1..f852a52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -319,8 +319,8 @@ case class FileSourceScanExec(
 
   override lazy val metrics =
     Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
-      "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
-      "metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time"),
+      "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"),
+      "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"),
       "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
   protected override def doExecute(): RDD[InternalRow] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
index 220a4b0..19a47ff 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala
@@ -63,8 +63,9 @@ case class HashAggregateExec(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
     "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
     "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"),
-    "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time"),
-    "avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe"))
+    "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build"),
+    "avgHashProbe" ->
+      SQLMetrics.createAverageMetric(sparkContext, "avg hash probe bucket list iters"))
 
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
 
@@ -362,7 +363,7 @@ case class HashAggregateExec(
     metrics.incPeakExecutionMemory(maxMemory)
 
     // Update average hashmap probe
-    avgHashProbe.set(hashMap.getAverageProbesPerLookup())
+    avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations)
 
     if (sorter == null) {
       // not spilled
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
index bd52c63..5b340ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala
@@ -77,7 +77,7 @@ case class ObjectHashAggregateExec(
 
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
-    "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "aggregate time")
+    "aggTime" -> SQLMetrics.createTimingMetric(sparkContext, "time in aggregation build")
   )
 
   override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 6d849869..6dc6465 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -387,7 +387,7 @@ class TungstenAggregationIterator(
     metrics.incPeakExecutionMemory(maxMemory)
 
     // Updating average hashmap probe
-    avgHashProbe.set(hashMap.getAverageProbesPerLookup())
+    avgHashProbe.set(hashMap.getAvgHashProbeBucketListIterations)
   })
 
   ///////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 318dca0..4352721 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -663,8 +663,8 @@ object CoalesceExec {
 case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
 
   override lazy val metrics = Map(
-    "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
-    "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"))
+    "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
+    "collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect"))
 
   override def output: Seq[Attribute] = child.output
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
index ba7d2b7..b71c2d1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala
@@ -173,7 +173,7 @@ object BasicWriteJobStatsTracker {
     val sparkContext = SparkContext.getActive.get
     Map(
       NUM_FILES_KEY -> SQLMetrics.createMetric(sparkContext, "number of written files"),
-      NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createMetric(sparkContext, "bytes of written output"),
+      NUM_OUTPUT_BYTES_KEY -> SQLMetrics.createSizeMetric(sparkContext, "written output"),
       NUM_OUTPUT_ROWS_KEY -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
       NUM_PARTS_KEY -> SQLMetrics.createMetric(sparkContext, "number of dynamic part")
     )
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
index 703d351..d55d4fa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
@@ -44,10 +44,10 @@ case class BroadcastExchangeExec(
     child: SparkPlan) extends Exchange {
 
   override lazy val metrics = Map(
-    "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
-    "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"),
-    "buildTime" -> SQLMetrics.createMetric(sparkContext, "time to build (ms)"),
-    "broadcastTime" -> SQLMetrics.createMetric(sparkContext, "time to broadcast (ms)"))
+    "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
+    "collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect"),
+    "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build"),
+    "broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to broadcast"))
 
   override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index c11af34..d689a6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -79,8 +79,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
     "numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"),
     "numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"),
-    "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "total time to update rows"),
-    "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "total time to remove rows"),
+    "allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to update"),
+    "allRemovalsTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to remove"),
     "commitTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to commit changes"),
     "stateMemory" -> SQLMetrics.createSizeMetric(sparkContext, "memory used by state")
   ) ++ stateStoreCustomMetrics
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 6174ec4..98a8ad5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -96,9 +96,9 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
     val df = testData2.groupBy().count() // 2 partitions
     val expected1 = Seq(
       Map("number of output rows" -> 2L,
-        "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"),
+        "avg hash probe bucket list iters (min, med, max)" -> "\n(1, 1, 1)"),
       Map("number of output rows" -> 1L,
-        "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
+        "avg hash probe bucket list iters (min, med, max)" -> "\n(1, 1, 1)"))
     val shuffleExpected1 = Map(
       "records read" -> 2L,
       "local blocks read" -> 2L,
@@ -114,9 +114,9 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
     val df2 = testData2.groupBy('a).count()
     val expected2 = Seq(
       Map("number of output rows" -> 4L,
-        "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"),
+        "avg hash probe bucket list iters (min, med, max)" -> "\n(1, 1, 1)"),
       Map("number of output rows" -> 3L,
-        "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
+        "avg hash probe bucket list iters (min, med, max)" -> "\n(1, 1, 1)"))
     val shuffleExpected2 = Map(
       "records read" -> 4L,
       "local blocks read" -> 4L,
@@ -162,7 +162,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       }
       val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
       nodeIds.foreach { nodeId =>
-        val probes = metrics(nodeId)._2("avg hash probe (min, med, max)")
+        val probes = metrics(nodeId)._2("avg hash probe bucket list iters (min, med, max)")
         probes.toString.stripPrefix("\n(").stripSuffix(")").split(", ").foreach { probe =>
           assert(probe.toDouble > 1.0)
         }
@@ -570,7 +570,7 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
       testSparkPlanMetrics(df, 1, Map(
         0L -> (("Scan parquet default.testdataforscan", Map(
           "number of output rows" -> 3L,
-          "number of files" -> 2L))))
+          "number of files read" -> 2L))))
       )
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 0e13f7d..f12eeaa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -84,8 +84,10 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
       assert(metricValue == expected)
     }
 
-    val totalNumBytesMetric = executedNode.metrics.find(_.name == "bytes of written output").get
-    val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "").toInt
+    val totalNumBytesMetric = executedNode.metrics.find(
+      _.name == "written output total (min, med, max)").get
+    val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "")
+      .split(" ").head.trim.toDouble
     assert(totalNumBytes > 0)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org