You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/04/26 14:20:09 UTC

[spark] branch master updated: [SPARK-35230][SQL] Move custom metric classes to proper package

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bdac191  [SPARK-35230][SQL] Move custom metric classes to proper package
bdac191 is described below

commit bdac19184ac615355c16b5c097ac19970aca126a
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Mon Apr 26 07:19:36 2021 -0700

    [SPARK-35230][SQL] Move custom metric classes to proper package
    
    ### What changes were proposed in this pull request?
    
    This patch moves DS v2 custom metric classes to `org.apache.spark.sql.connector.metric` package. Moving `CustomAvgMetric` and `CustomSumMetric` to above package and make them as public java abstract class too.
    
    ### Why are the changes needed?
    
    `CustomAvgMetric` and `CustomSumMetric`  should be public APIs for developers to extend. As there are a few metric classes, we should put them together in one package.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, dev only and they are not released yet.
    
    ### How was this patch tested?
    
    Unit tests.
    
    Closes #32348 from viirya/move-custom-metric-classes.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/kafka010/KafkaBatchPartitionReader.scala   |  2 +-
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  3 +-
 .../CustomAvgMetric.java}                          | 41 +++++++++-------------
 .../sql/connector/{ => metric}/CustomMetric.java   | 34 +++++++++---------
 .../CustomSumMetric.java}                          | 35 ++++++------------
 .../connector/{ => metric}/CustomTaskMetric.java   | 18 +++++-----
 .../spark/sql/connector/read/PartitionReader.java  |  2 +-
 .../org/apache/spark/sql/connector/read/Scan.java  |  2 +-
 .../spark/sql/execution/metric/CustomMetrics.scala | 33 +----------------
 .../spark/sql/execution/metric/SQLMetrics.scala    |  2 +-
 .../sql/execution/ui/SQLAppStatusListener.scala    |  2 +-
 .../sql/execution/metric/CustomMetricsSuite.scala  |  1 +
 .../execution/ui/SQLAppStatusListenerSuite.scala   |  3 +-
 13 files changed, 62 insertions(+), 116 deletions(-)

diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
index 0aa51a9..b6d64c7 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
@@ -22,7 +22,7 @@ import java.{util => ju}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.connector.CustomTaskMetric
+import org.apache.spark.sql.connector.metric.CustomTaskMetric
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
 import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
 
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index c34c435..5c772ab 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,13 +30,12 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.kafka010.KafkaConfigUpdater
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.connector.CustomMetric
 import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric}
 import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
 import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
 import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, SupportsTruncate, WriteBuilder}
 import org.apache.spark.sql.connector.write.streaming.StreamingWrite
-import org.apache.spark.sql.execution.metric.CustomSumMetric
 import org.apache.spark.sql.execution.streaming.{Sink, Source}
 import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend}
 import org.apache.spark.sql.sources._
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomAvgMetric.java
similarity index 52%
copy from sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java
copy to sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomAvgMetric.java
index bbd35ac..71e8300 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomAvgMetric.java
@@ -15,37 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.connector;
+package org.apache.spark.sql.connector.metric;
 
 import org.apache.spark.annotation.Evolving;
 
+import java.util.Arrays;
+import java.text.DecimalFormat;
+
 /**
- * A custom metric. Data source can define supported custom metrics using this interface.
- * During query execution, Spark will collect the task metrics using {@link CustomTaskMetric}
- * and combine the metrics at the driver side. How to combine task metrics is defined by the
- * metric class with the same metric name.
+ * Built-in `CustomMetric` that computes average of metric values. Note that please extend this
+ * class and override `name` and `description` to create your custom metric for real usage.
  *
  * @since 3.2.0
  */
 @Evolving
-public interface CustomMetric {
-    /**
-     * Returns the name of custom metric.
-     */
-    String name();
-
-    /**
-     * Returns the description of custom metric.
-     */
-    String description();
-
-    /**
-     * The initial value of this metric.
-     */
-    long initialValue = 0L;
-
-    /**
-     * Given an array of task metric values, returns aggregated final metric value.
-     */
-    String aggregateTaskMetrics(long[] taskMetrics);
+public abstract class CustomAvgMetric implements CustomMetric {
+  @Override
+  public String aggregateTaskMetrics(long[] taskMetrics) {
+    if (taskMetrics.length > 0) {
+      double average = ((double)Arrays.stream(taskMetrics).sum()) / taskMetrics.length;
+      return new DecimalFormat("#0.000").format(average);
+    } else {
+      return "0";
+    }
+  }
 }
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomMetric.java
similarity index 73%
copy from sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java
copy to sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomMetric.java
index bbd35ac..4c4151a 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomMetric.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.connector;
+package org.apache.spark.sql.connector.metric;
 
 import org.apache.spark.annotation.Evolving;
 
@@ -29,23 +29,23 @@ import org.apache.spark.annotation.Evolving;
  */
 @Evolving
 public interface CustomMetric {
-    /**
-     * Returns the name of custom metric.
-     */
-    String name();
+  /**
+   * Returns the name of custom metric.
+   */
+  String name();
 
-    /**
-     * Returns the description of custom metric.
-     */
-    String description();
+  /**
+   * Returns the description of custom metric.
+   */
+  String description();
 
-    /**
-     * The initial value of this metric.
-     */
-    long initialValue = 0L;
+  /**
+   * The initial value of this metric.
+   */
+  long initialValue = 0L;
 
-    /**
-     * Given an array of task metric values, returns aggregated final metric value.
-     */
-    String aggregateTaskMetrics(long[] taskMetrics);
+  /**
+   * Given an array of task metric values, returns aggregated final metric value.
+   */
+  String aggregateTaskMetrics(long[] taskMetrics);
 }
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomSumMetric.java
similarity index 52%
rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java
rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomSumMetric.java
index bbd35ac..ba28e9b 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomSumMetric.java
@@ -15,37 +15,22 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.connector;
+package org.apache.spark.sql.connector.metric;
 
 import org.apache.spark.annotation.Evolving;
 
+import java.util.Arrays;
+
 /**
- * A custom metric. Data source can define supported custom metrics using this interface.
- * During query execution, Spark will collect the task metrics using {@link CustomTaskMetric}
- * and combine the metrics at the driver side. How to combine task metrics is defined by the
- * metric class with the same metric name.
+ * Built-in `CustomMetric` that sums up metric values. Note that please extend this class
+ * and override `name` and `description` to create your custom metric for real usage.
  *
  * @since 3.2.0
  */
 @Evolving
-public interface CustomMetric {
-    /**
-     * Returns the name of custom metric.
-     */
-    String name();
-
-    /**
-     * Returns the description of custom metric.
-     */
-    String description();
-
-    /**
-     * The initial value of this metric.
-     */
-    long initialValue = 0L;
-
-    /**
-     * Given an array of task metric values, returns aggregated final metric value.
-     */
-    String aggregateTaskMetrics(long[] taskMetrics);
+public abstract class CustomSumMetric implements CustomMetric {
+  @Override
+  public String aggregateTaskMetrics(long[] taskMetrics) {
+    return String.valueOf(Arrays.stream(taskMetrics).sum());
+  }
 }
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java
similarity index 88%
rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java
rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java
index 99ea8f6..1b6f04d 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.connector;
+package org.apache.spark.sql.connector.metric;
 
 import org.apache.spark.annotation.Evolving;
 import org.apache.spark.sql.connector.read.PartitionReader;
@@ -34,13 +34,13 @@ import org.apache.spark.sql.connector.read.PartitionReader;
  */
 @Evolving
 public interface CustomTaskMetric {
-    /**
-     * Returns the name of custom task metric.
-     */
-    String name();
+  /**
+   * Returns the name of custom task metric.
+   */
+  String name();
 
-    /**
-     * Returns the long value of custom task metric.
-     */
-    long value();
+  /**
+   * Returns the long value of custom task metric.
+   */
+  long value();
 }
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java
index d6cf070..5286bbf 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java
@@ -21,7 +21,7 @@ import java.io.Closeable;
 import java.io.IOException;
 
 import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.connector.CustomTaskMetric;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
 
 /**
  * A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
index b70a656..0c009f5 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.connector.read;
 
 import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.connector.CustomMetric;
+import org.apache.spark.sql.connector.metric.CustomMetric;
 import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
 import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
 import org.apache.spark.sql.types.StructType;
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala
index cc28be3..f2449a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala
@@ -17,10 +17,7 @@
 
 package org.apache.spark.sql.execution.metric
 
-import java.text.NumberFormat
-import java.util.Locale
-
-import org.apache.spark.sql.connector.CustomMetric
+import org.apache.spark.sql.connector.metric.CustomMetric
 
 object CustomMetrics {
   private[spark] val V2_CUSTOM = "v2Custom"
@@ -45,31 +42,3 @@ object CustomMetrics {
     }
   }
 }
-
-/**
- * Built-in `CustomMetric` that sums up metric values. Note that please extend this class
- * and override `name` and `description` to create your custom metric for real usage.
- */
-abstract class CustomSumMetric extends CustomMetric {
-
-  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
-    taskMetrics.sum.toString
-  }
-}
-
-/**
- * Built-in `CustomMetric` that computes average of metric values. Note that please extend this
- * class and override `name` and `description` to create your custom metric for real usage.
- */
-abstract class CustomAvgMetric extends CustomMetric {
-
-  override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
-    val average = if (taskMetrics.isEmpty) {
-      0.0
-    } else {
-      taskMetrics.sum.toDouble / taskMetrics.length
-    }
-    val numberFormat = NumberFormat.getNumberInstance(Locale.US)
-    numberFormat.format(average)
-  }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 0e48e6e..959144b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -24,7 +24,7 @@ import scala.concurrent.duration._
 
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.AccumulableInfo
-import org.apache.spark.sql.connector.CustomMetric
+import org.apache.spark.sql.connector.metric.CustomMetric
 import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
 import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index a323855..e7ab4a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -28,7 +28,7 @@ import org.apache.spark.{JobExecutionStatus, SparkConf}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Status._
 import org.apache.spark.scheduler._
-import org.apache.spark.sql.connector.CustomMetric
+import org.apache.spark.sql.connector.metric.CustomMetric
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.metric._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala
index 020f3f4..440b0dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.metric
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.connector.metric.{CustomAvgMetric, CustomSumMetric}
 
 class CustomMetricsSuite extends SparkFunSuite {
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index a582651..612b74a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -37,7 +37,8 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.connector.{CustomMetric, CustomTaskMetric, RangeInputPartition, SimpleScanBuilder}
+import org.apache.spark.sql.connector.{RangeInputPartition, SimpleScanBuilder}
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
 import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
 import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
 import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org