You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/06/26 17:28:56 UTC

[spark] branch master updated: [SPARK-44026] Allow providing initial value for SQLMetrics

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 680ca2e56f2 [SPARK-44026] Allow providing initial value for SQLMetrics
680ca2e56f2 is described below

commit 680ca2e56f2c8fc759743ad6755f6e3b1a19c629
Author: Johan Lasperas <jo...@databricks.com>
AuthorDate: Mon Jun 26 10:28:35 2023 -0700

    [SPARK-44026] Allow providing initial value for SQLMetrics
    
    ### What changes were proposed in this pull request?
    This change brings a small improvement to allow creating SQLMetrics with an arbitrary initial value instead of always using `-1`.
    
    ### Why are the changes needed?
    This simplifies creating metrics when there's a need for a metric with a starting value other than `-1`. For example, the Delta MERGE INTO implementation creates today different [timing metrics](https://github.com/delta-io/delta/blob/0bec32820e95e80b6accd2c88e4d532b1247497d/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala#L173)  and set them manually to 0 after creation.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Adding tests to `SQLMetricsSuite` creating metrics with initial values.
    
    Closes #41555 from johanl-db/SPARK-44026-sql-metrics-initial-value.
    
    Authored-by: Johan Lasperas <jo...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/execution/metric/SQLMetrics.scala    | 12 +++++-----
 .../sql/execution/metric/SQLMetricsSuite.scala     | 26 ++++++++++++++++++++++
 2 files changed, 32 insertions(+), 6 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 6d2578c3576..3326c5d4cb9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -141,27 +141,27 @@ object SQLMetrics {
    * Create a metric to report the size information (including total, min, med, max) like data size,
    * spill size, etc.
    */
-  def createSizeMetric(sc: SparkContext, name: String): SQLMetric = {
+  def createSizeMetric(sc: SparkContext, name: String, initValue: Long = -1): SQLMetric = {
     // The final result of this metric in physical operator UI may look like:
     // data size total (min, med, max):
     // 100GB (100MB, 1GB, 10GB)
-    val acc = new SQLMetric(SIZE_METRIC, -1)
+    val acc = new SQLMetric(SIZE_METRIC, initValue)
     acc.register(sc, name = metricsCache.get(name), countFailedValues = false)
     acc
   }
 
-  def createTimingMetric(sc: SparkContext, name: String): SQLMetric = {
+  def createTimingMetric(sc: SparkContext, name: String, initValue: Long = -1): SQLMetric = {
     // The final result of this metric in physical operator UI may looks like:
     // duration total (min, med, max):
     // 5s (800ms, 1s, 2s)
-    val acc = new SQLMetric(TIMING_METRIC, -1)
+    val acc = new SQLMetric(TIMING_METRIC, initValue)
     acc.register(sc, name = metricsCache.get(name), countFailedValues = false)
     acc
   }
 
-  def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = {
+  def createNanoTimingMetric(sc: SparkContext, name: String, initValue: Long = -1): SQLMetric = {
     // Same with createTimingMetric, just normalize the unit of time to millisecond.
-    val acc = new SQLMetric(NS_TIMING_METRIC, -1)
+    val acc = new SQLMetric(NS_TIMING_METRIC, initValue)
     acc.register(sc, name = metricsCache.get(name), countFailedValues = false)
     acc
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 877e2cadcfb..6347757e178 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -934,6 +934,32 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
     assert(windowGroupLimit.isDefined)
     assert(windowGroupLimit.get.metrics("numOutputRows").value == 2L)
   }
+
+  test("Creating metrics with initial values") {
+    assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").value === 0)
+    assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = -1).value === 0)
+    assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = 5).value === 5)
+
+    assert(SQLMetrics.createSizeMetric(sparkContext, name = "m").isZero())
+    assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = -1).isZero())
+    assert(SQLMetrics.createSizeMetric(sparkContext, name = "m", initValue = 5).isZero())
+
+    assert(SQLMetrics.createTimingMetric(sparkContext, name = "m").value === 0)
+    assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = -1).value === 0)
+    assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = 5).value === 5)
+
+    assert(SQLMetrics.createTimingMetric(sparkContext, name = "m").isZero())
+    assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = -1).isZero())
+    assert(SQLMetrics.createTimingMetric(sparkContext, name = "m", initValue = 5).isZero())
+
+    assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m").value === 0)
+    assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).value === 0)
+    assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = 5).value === 5)
+
+    assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m").isZero())
+    assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = -1).isZero())
+    assert(SQLMetrics.createNanoTimingMetric(sparkContext, name = "m", initValue = 5).isZero())
+  }
 }
 
 case class CustomFileCommitProtocol(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org