You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/12 02:04:17 UTC
[spark] branch master updated: [SPARK-26193][SQL][FOLLOW UP] Read
metrics rename and display text changes
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bd8da37 [SPARK-26193][SQL][FOLLOW UP] Read metrics rename and display text changes
bd8da37 is described below
commit bd8da3799dd160771ebb3ea55b7678b644248425
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Wed Dec 12 10:03:50 2018 +0800
[SPARK-26193][SQL][FOLLOW UP] Read metrics rename and display text changes
## What changes were proposed in this pull request?
Follow up pr for #23207, include following changes:
- Rename `SQLShuffleMetricsReporter` to `SQLShuffleReadMetricsReporter` to make it match with write side naming.
- Display text changes for read side for naming consistent.
- Rename function in `ShuffleWriteProcessor`.
- Delete `private[spark]` in execution package.
## How was this patch tested?
Existing tests.
Closes #23286 from xuanyuanking/SPARK-26193-follow.
Authored-by: Yuanjian Li <xy...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../apache/spark/scheduler/ShuffleMapTask.scala | 2 +-
.../spark/shuffle/ShuffleWriteProcessor.scala | 2 +-
.../spark/sql/execution/ShuffledRowRDD.scala | 6 ++--
.../execution/exchange/ShuffleExchangeExec.scala | 4 +--
.../org/apache/spark/sql/execution/limit.scala | 6 ++--
.../metric/SQLShuffleMetricsReporter.scala | 36 +++++++++++-----------
.../sql/execution/UnsafeRowSerializerSuite.scala | 4 +--
.../sql/execution/metric/SQLMetricsSuite.scala | 20 ++++++------
8 files changed, 40 insertions(+), 40 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 2a8d1dd..35664ff 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -92,7 +92,7 @@ private[spark] class ShuffleMapTask(
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
- dep.shuffleWriterProcessor.writeProcess(rdd, dep, partitionId, context, partition)
+ dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
}
override def preferredLocations: Seq[TaskLocation] = preferredLocs
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
index f521315..5b0c7e9 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
@@ -41,7 +41,7 @@ private[spark] class ShuffleWriteProcessor extends Serializable with Logging {
* get from [[ShuffleManager]] and triggers rdd compute, finally return the [[MapStatus]] for
* this task.
*/
- def writeProcess(
+ def write(
rdd: RDD[_],
dep: ShuffleDependency[_, _, _],
partitionId: Int,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
index 9b05faa..079ff25 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
@@ -22,7 +22,7 @@ import java.util.Arrays
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleMetricsReporter}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter}
/**
* The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition
@@ -157,9 +157,9 @@ class ShuffledRowRDD(
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
- // `SQLShuffleMetricsReporter` will update its own metrics for SQL exchange operator,
+ // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator,
// as well as the `tempMetrics` for basic shuffle metrics.
- val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, metrics)
+ val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
// The range of pre-shuffle partitions that we are fetching at here is
// [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].
val reader =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 0c20205..da7b0c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns
import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.MutablePair
@@ -50,7 +50,7 @@ case class ShuffleExchangeExec(
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private lazy val readMetrics =
- SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+ SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = Map(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
) ++ readMetrics ++ writeMetrics
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 1f2fdde..bfaf080 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
-import org.apache.spark.sql.execution.metric.{SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.execution.metric.{SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
/**
* Take the first `limit` elements and collect them to a single partition.
@@ -41,7 +41,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) extends UnaryExecNode
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private lazy val readMetrics =
- SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+ SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = readMetrics ++ writeMetrics
protected override def doExecute(): RDD[InternalRow] = {
val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
@@ -165,7 +165,7 @@ case class TakeOrderedAndProjectExec(
private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
private lazy val readMetrics =
- SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+ SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
override lazy val metrics = readMetrics ++ writeMetrics
protected override def doExecute(): RDD[InternalRow] = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
index ff7941e..2c0ea80 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
@@ -27,23 +27,23 @@ import org.apache.spark.shuffle.ShuffleWriteMetricsReporter
* @param metrics All metrics in current SparkPlan. This param should not empty and
* contains all shuffle metrics defined in createShuffleReadMetrics.
*/
-private[spark] class SQLShuffleMetricsReporter(
+class SQLShuffleReadMetricsReporter(
tempMetrics: TempShuffleReadMetrics,
metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
private[this] val _remoteBlocksFetched =
- metrics(SQLShuffleMetricsReporter.REMOTE_BLOCKS_FETCHED)
+ metrics(SQLShuffleReadMetricsReporter.REMOTE_BLOCKS_FETCHED)
private[this] val _localBlocksFetched =
- metrics(SQLShuffleMetricsReporter.LOCAL_BLOCKS_FETCHED)
+ metrics(SQLShuffleReadMetricsReporter.LOCAL_BLOCKS_FETCHED)
private[this] val _remoteBytesRead =
- metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ)
+ metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ)
private[this] val _remoteBytesReadToDisk =
- metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ_TO_DISK)
+ metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ_TO_DISK)
private[this] val _localBytesRead =
- metrics(SQLShuffleMetricsReporter.LOCAL_BYTES_READ)
+ metrics(SQLShuffleReadMetricsReporter.LOCAL_BYTES_READ)
private[this] val _fetchWaitTime =
- metrics(SQLShuffleMetricsReporter.FETCH_WAIT_TIME)
+ metrics(SQLShuffleReadMetricsReporter.FETCH_WAIT_TIME)
private[this] val _recordsRead =
- metrics(SQLShuffleMetricsReporter.RECORDS_READ)
+ metrics(SQLShuffleReadMetricsReporter.RECORDS_READ)
override def incRemoteBlocksFetched(v: Long): Unit = {
_remoteBlocksFetched.add(v)
@@ -75,7 +75,7 @@ private[spark] class SQLShuffleMetricsReporter(
}
}
-private[spark] object SQLShuffleMetricsReporter {
+object SQLShuffleReadMetricsReporter {
val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
val REMOTE_BYTES_READ = "remoteBytesRead"
@@ -88,8 +88,8 @@ private[spark] object SQLShuffleMetricsReporter {
* Create all shuffle read relative metrics and return the Map.
*/
def createShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map(
- REMOTE_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "remote blocks fetched"),
- LOCAL_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "local blocks fetched"),
+ REMOTE_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "remote blocks read"),
+ LOCAL_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "local blocks read"),
REMOTE_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "remote bytes read"),
REMOTE_BYTES_READ_TO_DISK -> SQLMetrics.createSizeMetric(sc, "remote bytes read to disk"),
LOCAL_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "local bytes read"),
@@ -102,7 +102,7 @@ private[spark] object SQLShuffleMetricsReporter {
* @param metricsReporter Other reporter need to be updated in this SQLShuffleWriteMetricsReporter.
* @param metrics Shuffle write metrics in current SparkPlan.
*/
-private[spark] class SQLShuffleWriteMetricsReporter(
+class SQLShuffleWriteMetricsReporter(
metricsReporter: ShuffleWriteMetricsReporter,
metrics: Map[String, SQLMetric]) extends ShuffleWriteMetricsReporter {
private[this] val _bytesWritten =
@@ -112,29 +112,29 @@ private[spark] class SQLShuffleWriteMetricsReporter(
private[this] val _writeTime =
metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
- override private[spark] def incBytesWritten(v: Long): Unit = {
+ override def incBytesWritten(v: Long): Unit = {
metricsReporter.incBytesWritten(v)
_bytesWritten.add(v)
}
- override private[spark] def decRecordsWritten(v: Long): Unit = {
+ override def decRecordsWritten(v: Long): Unit = {
metricsReporter.decBytesWritten(v)
_recordsWritten.set(_recordsWritten.value - v)
}
- override private[spark] def incRecordsWritten(v: Long): Unit = {
+ override def incRecordsWritten(v: Long): Unit = {
metricsReporter.incRecordsWritten(v)
_recordsWritten.add(v)
}
- override private[spark] def incWriteTime(v: Long): Unit = {
+ override def incWriteTime(v: Long): Unit = {
metricsReporter.incWriteTime(v)
_writeTime.add(v)
}
- override private[spark] def decBytesWritten(v: Long): Unit = {
+ override def decBytesWritten(v: Long): Unit = {
metricsReporter.decBytesWritten(v)
_bytesWritten.set(_bytesWritten.value - v)
}
}
-private[spark] object SQLShuffleWriteMetricsReporter {
+object SQLShuffleWriteMetricsReporter {
val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 1ad5713..ca86922 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter
+import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
import org.apache.spark.sql.types._
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter
@@ -140,7 +140,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession {
new UnsafeRowSerializer(2))
val shuffled = new ShuffledRowRDD(
dependency,
- SQLShuffleMetricsReporter.createShuffleReadMetrics(spark.sparkContext))
+ SQLShuffleReadMetricsReporter.createShuffleReadMetrics(spark.sparkContext))
shuffled.count()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index f649549..47265df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -96,8 +96,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
val shuffleExpected1 = Map(
"records read" -> 2L,
- "local blocks fetched" -> 2L,
- "remote blocks fetched" -> 0L,
+ "local blocks read" -> 2L,
+ "remote blocks read" -> 0L,
"shuffle records written" -> 2L)
testSparkPlanMetrics(df, 1, Map(
2L -> (("HashAggregate", expected1(0))),
@@ -114,8 +114,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
val shuffleExpected2 = Map(
"records read" -> 4L,
- "local blocks fetched" -> 4L,
- "remote blocks fetched" -> 0L,
+ "local blocks read" -> 4L,
+ "remote blocks read" -> 0L,
"shuffle records written" -> 4L)
testSparkPlanMetrics(df2, 1, Map(
2L -> (("HashAggregate", expected2(0))),
@@ -175,8 +175,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
1L -> (("Exchange", Map(
"shuffle records written" -> 2L,
"records read" -> 2L,
- "local blocks fetched" -> 2L,
- "remote blocks fetched" -> 0L))),
+ "local blocks read" -> 2L,
+ "remote blocks read" -> 0L))),
0L -> (("ObjectHashAggregate", Map("number of output rows" -> 1L))))
)
@@ -187,8 +187,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
1L -> (("Exchange", Map(
"shuffle records written" -> 4L,
"records read" -> 4L,
- "local blocks fetched" -> 4L,
- "remote blocks fetched" -> 0L))),
+ "local blocks read" -> 4L,
+ "remote blocks read" -> 0L))),
0L -> (("ObjectHashAggregate", Map("number of output rows" -> 3L))))
)
}
@@ -216,8 +216,8 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
"number of output rows" -> 4L))),
2L -> (("Exchange", Map(
"records read" -> 4L,
- "local blocks fetched" -> 2L,
- "remote blocks fetched" -> 0L,
+ "local blocks read" -> 2L,
+ "remote blocks read" -> 0L,
"shuffle records written" -> 2L))))
)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org