You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ji...@apache.org on 2020/04/17 20:21:45 UTC
[spark] branch master updated: [SPARK-31253][SQL][FOLLOW-UP]
simplify the code of calculating size metrics of AQE shuffle
This is an automated email from the ASF dual-hosted git repository.
jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new db7b865 [SPARK-31253][SQL][FOLLOW-UP] simplify the code of calculating size metrics of AQE shuffle
db7b865 is described below
commit db7b8651a19d5a749a9f0b4e8fb517e6994921c2
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Apr 17 13:20:34 2020 -0700
[SPARK-31253][SQL][FOLLOW-UP] simplify the code of calculating size metrics of AQE shuffle
### What changes were proposed in this pull request?
A followup of https://github.com/apache/spark/pull/28175:
1. use mutable collection to store the driver metrics
2. don't send size metrics if there is no map stats, as UI will display size as 0 if there is no data
3. calculate partition data size separately, to make the code easier to read.
### Why are the changes needed?
code simplification
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
existing tests
Closes #28240 from cloud-fan/refactor.
Authored-by: Wenchen Fan <we...@databricks.com>
Signed-off-by: Xingbo Jiang <xi...@databricks.com>
---
.../adaptive/CustomShuffleReaderExec.scala | 50 ++++++++++------------
1 file changed, 22 insertions(+), 28 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index 68f20bc..6450d49 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -97,14 +97,27 @@ case class CustomShuffleReaderExec private(
case _ => None
}
+ @transient private lazy val partitionDataSizes: Option[Seq[Long]] = {
+ if (!isLocalReader && shuffleStage.get.mapStats.isDefined) {
+ val bytesByPartitionId = shuffleStage.get.mapStats.get.bytesByPartitionId
+ Some(partitionSpecs.map {
+ case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
+ startReducerIndex.until(endReducerIndex).map(bytesByPartitionId).sum
+ case p: PartialReducerPartitionSpec => p.dataSize
+ case p => throw new IllegalStateException("unexpected " + p)
+ })
+ } else {
+ None
+ }
+ }
+
private def sendDriverMetrics(): Unit = {
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
- var driverAccumUpdates: Seq[(Long, Long)] = Seq.empty
+ val driverAccumUpdates = ArrayBuffer.empty[(Long, Long)]
val numPartitionsMetric = metrics("numPartitions")
numPartitionsMetric.set(partitionSpecs.length)
- driverAccumUpdates = driverAccumUpdates :+
- (numPartitionsMetric.id, partitionSpecs.length.toLong)
+ driverAccumUpdates += (numPartitionsMetric.id -> partitionSpecs.length.toLong)
if (hasSkewedPartition) {
val skewedMetric = metrics("numSkewedPartitions")
@@ -112,33 +125,14 @@ case class CustomShuffleReaderExec private(
case p: PartialReducerPartitionSpec => p.reducerIndex
}.distinct.length
skewedMetric.set(numSkewedPartitions)
- driverAccumUpdates = driverAccumUpdates :+ (skewedMetric.id, numSkewedPartitions.toLong)
+ driverAccumUpdates += (skewedMetric.id -> numSkewedPartitions.toLong)
}
- if(!isLocalReader) {
- val partitionMetrics = metrics("partitionDataSize")
- val mapStats = shuffleStage.get.mapStats
-
- if (mapStats.isEmpty) {
- partitionMetrics.set(0)
- driverAccumUpdates = driverAccumUpdates :+ (partitionMetrics.id, 0L)
- } else {
- var sum = 0L
- partitionSpecs.foreach {
- case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
- val dataSize = startReducerIndex.until(endReducerIndex).map(
- mapStats.get.bytesByPartitionId(_)).sum
- driverAccumUpdates = driverAccumUpdates :+ (partitionMetrics.id, dataSize)
- sum += dataSize
- case p: PartialReducerPartitionSpec =>
- driverAccumUpdates = driverAccumUpdates :+ (partitionMetrics.id, p.dataSize)
- sum += p.dataSize
- case p => throw new IllegalStateException("unexpected " + p)
- }
-
- // Set sum value to "partitionDataSize" metric.
- partitionMetrics.set(sum)
- }
+ partitionDataSizes.foreach { dataSizes =>
+ val partitionDataSizeMetrics = metrics("partitionDataSize")
+ driverAccumUpdates ++= dataSizes.map(partitionDataSizeMetrics.id -> _)
+ // Set sum value to "partitionDataSize" metric.
+ partitionDataSizeMetrics.set(dataSizes.sum)
}
SQLMetrics.postDriverMetricsUpdatedByValue(sparkContext, executionId, driverAccumUpdates)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org