You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/04/26 14:20:09 UTC
[spark] branch master updated: [SPARK-35230][SQL] Move custom
metric classes to proper package
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bdac191 [SPARK-35230][SQL] Move custom metric classes to proper package
bdac191 is described below
commit bdac19184ac615355c16b5c097ac19970aca126a
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Mon Apr 26 07:19:36 2021 -0700
[SPARK-35230][SQL] Move custom metric classes to proper package
### What changes were proposed in this pull request?
This patch moves DS v2 custom metric classes to `org.apache.spark.sql.connector.metric` package. Moving `CustomAvgMetric` and `CustomSumMetric` to above package and make them as public java abstract class too.
### Why are the changes needed?
`CustomAvgMetric` and `CustomSumMetric` should be public APIs for developers to extend. As there are a few metric classes, we should put them together in one package.
### Does this PR introduce _any_ user-facing change?
No, dev only and they are not released yet.
### How was this patch tested?
Unit tests.
Closes #32348 from viirya/move-custom-metric-classes.
Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../sql/kafka010/KafkaBatchPartitionReader.scala | 2 +-
.../spark/sql/kafka010/KafkaSourceProvider.scala | 3 +-
.../CustomAvgMetric.java} | 41 +++++++++-------------
.../sql/connector/{ => metric}/CustomMetric.java | 34 +++++++++---------
.../CustomSumMetric.java} | 35 ++++++------------
.../connector/{ => metric}/CustomTaskMetric.java | 18 +++++-----
.../spark/sql/connector/read/PartitionReader.java | 2 +-
.../org/apache/spark/sql/connector/read/Scan.java | 2 +-
.../spark/sql/execution/metric/CustomMetrics.scala | 33 +----------------
.../spark/sql/execution/metric/SQLMetrics.scala | 2 +-
.../sql/execution/ui/SQLAppStatusListener.scala | 2 +-
.../sql/execution/metric/CustomMetricsSuite.scala | 1 +
.../execution/ui/SQLAppStatusListenerSuite.scala | 3 +-
13 files changed, 62 insertions(+), 116 deletions(-)
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
index 0aa51a9..b6d64c7 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaBatchPartitionReader.scala
@@ -22,7 +22,7 @@ import java.{util => ju}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
-import org.apache.spark.sql.connector.CustomTaskMetric
+import org.apache.spark.sql.connector.metric.CustomTaskMetric
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index c34c435..5c772ab 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -30,13 +30,12 @@ import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
-import org.apache.spark.sql.connector.CustomMetric
import org.apache.spark.sql.connector.catalog.{SupportsRead, SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomSumMetric}
import org.apache.spark.sql.connector.read.{Batch, Scan, ScanBuilder}
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.{BatchWrite, LogicalWriteInfo, SupportsTruncate, WriteBuilder}
import org.apache.spark.sql.connector.write.streaming.StreamingWrite
-import org.apache.spark.sql.execution.metric.CustomSumMetric
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.internal.connector.{SimpleTableProvider, SupportsStreamingUpdateAsAppend}
import org.apache.spark.sql.sources._
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomAvgMetric.java
similarity index 52%
copy from sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java
copy to sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomAvgMetric.java
index bbd35ac..71e8300 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomAvgMetric.java
@@ -15,37 +15,28 @@
* limitations under the License.
*/
-package org.apache.spark.sql.connector;
+package org.apache.spark.sql.connector.metric;
import org.apache.spark.annotation.Evolving;
+import java.util.Arrays;
+import java.text.DecimalFormat;
+
/**
- * A custom metric. Data source can define supported custom metrics using this interface.
- * During query execution, Spark will collect the task metrics using {@link CustomTaskMetric}
- * and combine the metrics at the driver side. How to combine task metrics is defined by the
- * metric class with the same metric name.
+ * Built-in `CustomMetric` that computes average of metric values. Note that please extend this
+ * class and override `name` and `description` to create your custom metric for real usage.
*
* @since 3.2.0
*/
@Evolving
-public interface CustomMetric {
- /**
- * Returns the name of custom metric.
- */
- String name();
-
- /**
- * Returns the description of custom metric.
- */
- String description();
-
- /**
- * The initial value of this metric.
- */
- long initialValue = 0L;
-
- /**
- * Given an array of task metric values, returns aggregated final metric value.
- */
- String aggregateTaskMetrics(long[] taskMetrics);
+public abstract class CustomAvgMetric implements CustomMetric {
+ @Override
+ public String aggregateTaskMetrics(long[] taskMetrics) {
+ if (taskMetrics.length > 0) {
+ double average = ((double)Arrays.stream(taskMetrics).sum()) / taskMetrics.length;
+ return new DecimalFormat("#0.000").format(average);
+ } else {
+ return "0";
+ }
+ }
}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomMetric.java
similarity index 73%
copy from sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java
copy to sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomMetric.java
index bbd35ac..4c4151a 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomMetric.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.connector;
+package org.apache.spark.sql.connector.metric;
import org.apache.spark.annotation.Evolving;
@@ -29,23 +29,23 @@ import org.apache.spark.annotation.Evolving;
*/
@Evolving
public interface CustomMetric {
- /**
- * Returns the name of custom metric.
- */
- String name();
+ /**
+ * Returns the name of custom metric.
+ */
+ String name();
- /**
- * Returns the description of custom metric.
- */
- String description();
+ /**
+ * Returns the description of custom metric.
+ */
+ String description();
- /**
- * The initial value of this metric.
- */
- long initialValue = 0L;
+ /**
+ * The initial value of this metric.
+ */
+ long initialValue = 0L;
- /**
- * Given an array of task metric values, returns aggregated final metric value.
- */
- String aggregateTaskMetrics(long[] taskMetrics);
+ /**
+ * Given an array of task metric values, returns aggregated final metric value.
+ */
+ String aggregateTaskMetrics(long[] taskMetrics);
}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomSumMetric.java
similarity index 52%
rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java
rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomSumMetric.java
index bbd35ac..ba28e9b 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomMetric.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomSumMetric.java
@@ -15,37 +15,22 @@
* limitations under the License.
*/
-package org.apache.spark.sql.connector;
+package org.apache.spark.sql.connector.metric;
import org.apache.spark.annotation.Evolving;
+import java.util.Arrays;
+
/**
- * A custom metric. Data source can define supported custom metrics using this interface.
- * During query execution, Spark will collect the task metrics using {@link CustomTaskMetric}
- * and combine the metrics at the driver side. How to combine task metrics is defined by the
- * metric class with the same metric name.
+ * Built-in `CustomMetric` that sums up metric values. Note that please extend this class
+ * and override `name` and `description` to create your custom metric for real usage.
*
* @since 3.2.0
*/
@Evolving
-public interface CustomMetric {
- /**
- * Returns the name of custom metric.
- */
- String name();
-
- /**
- * Returns the description of custom metric.
- */
- String description();
-
- /**
- * The initial value of this metric.
- */
- long initialValue = 0L;
-
- /**
- * Given an array of task metric values, returns aggregated final metric value.
- */
- String aggregateTaskMetrics(long[] taskMetrics);
+public abstract class CustomSumMetric implements CustomMetric {
+ @Override
+ public String aggregateTaskMetrics(long[] taskMetrics) {
+ return String.valueOf(Arrays.stream(taskMetrics).sum());
+ }
}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java
similarity index 88%
rename from sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java
rename to sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java
index 99ea8f6..1b6f04d 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/CustomTaskMetric.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/metric/CustomTaskMetric.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.connector;
+package org.apache.spark.sql.connector.metric;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.connector.read.PartitionReader;
@@ -34,13 +34,13 @@ import org.apache.spark.sql.connector.read.PartitionReader;
*/
@Evolving
public interface CustomTaskMetric {
- /**
- * Returns the name of custom task metric.
- */
- String name();
+ /**
+ * Returns the name of custom task metric.
+ */
+ String name();
- /**
- * Returns the long value of custom task metric.
- */
- long value();
+ /**
+ * Returns the long value of custom task metric.
+ */
+ long value();
}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java
index d6cf070..5286bbf 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReader.java
@@ -21,7 +21,7 @@ import java.io.Closeable;
import java.io.IOException;
import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.connector.CustomTaskMetric;
+import org.apache.spark.sql.connector.metric.CustomTaskMetric;
/**
* A partition reader returned by {@link PartitionReaderFactory#createReader(InputPartition)} or
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
index b70a656..0c009f5 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/Scan.java
@@ -18,7 +18,7 @@
package org.apache.spark.sql.connector.read;
import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.connector.CustomMetric;
+import org.apache.spark.sql.connector.metric.CustomMetric;
import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.types.StructType;
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala
index cc28be3..f2449a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala
@@ -17,10 +17,7 @@
package org.apache.spark.sql.execution.metric
-import java.text.NumberFormat
-import java.util.Locale
-
-import org.apache.spark.sql.connector.CustomMetric
+import org.apache.spark.sql.connector.metric.CustomMetric
object CustomMetrics {
private[spark] val V2_CUSTOM = "v2Custom"
@@ -45,31 +42,3 @@ object CustomMetrics {
}
}
}
-
-/**
- * Built-in `CustomMetric` that sums up metric values. Note that please extend this class
- * and override `name` and `description` to create your custom metric for real usage.
- */
-abstract class CustomSumMetric extends CustomMetric {
-
- override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
- taskMetrics.sum.toString
- }
-}
-
-/**
- * Built-in `CustomMetric` that computes average of metric values. Note that please extend this
- * class and override `name` and `description` to create your custom metric for real usage.
- */
-abstract class CustomAvgMetric extends CustomMetric {
-
- override def aggregateTaskMetrics(taskMetrics: Array[Long]): String = {
- val average = if (taskMetrics.isEmpty) {
- 0.0
- } else {
- taskMetrics.sum.toDouble / taskMetrics.length
- }
- val numberFormat = NumberFormat.getNumberInstance(Locale.US)
- numberFormat.format(average)
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index 0e48e6e..959144b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -24,7 +24,7 @@ import scala.concurrent.duration._
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.AccumulableInfo
-import org.apache.spark.sql.connector.CustomMetric
+import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, Utils}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index a323855..e7ab4a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -28,7 +28,7 @@ import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Status._
import org.apache.spark.scheduler._
-import org.apache.spark.sql.connector.CustomMetric
+import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric._
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala
index 020f3f4..440b0dc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/CustomMetricsSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.metric
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.connector.metric.{CustomAvgMetric, CustomSumMetric}
class CustomMetricsSuite extends SparkFunSuite {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index a582651..612b74a6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -37,7 +37,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.connector.{CustomMetric, CustomTaskMetric, RangeInputPartition, SimpleScanBuilder}
+import org.apache.spark.sql.connector.{RangeInputPartition, SimpleScanBuilder}
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org