You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/08 08:53:33 UTC
[spark] branch master updated: [SPARK-38406][SQL] Improve perfermance of ShufflePartitionsUtil createSkewPartitionSpecs
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9e1d00c [SPARK-38406][SQL] Improve perfermance of ShufflePartitionsUtil createSkewPartitionSpecs
9e1d00c is described below
commit 9e1d00c521964a6bbf7e0126fd6dcf0020509420
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Mar 8 16:51:39 2022 +0800
[SPARK-38406][SQL] Improve perfermance of ShufflePartitionsUtil createSkewPartitionSpecs
### What changes were proposed in this pull request?
Avoid unnecessary scala syntactic sugar.
### Why are the changes needed?
If shuffle is skewed with tens of thousands of map partitions and reduce partitions in AQE, the method `ShufflePartitionsUtil#createSkewPartitionSpecs` will be very slow. More unfortunately, it is running at driver side.
I test with local env using 50,000 maps and 10,000 reduces. We can see the cpu time using build seq. See the Flame Graph:
![image](https://user-images.githubusercontent.com/12025282/156567065-6d9bffe9-3ab3-469d-92e8-da9687a8e5a8.png)
And the perfermance number:
- before: 47,336 ms
- aflter: 9,274 ms
### Does this PR introduce _any_ user-facing change?
no, only improve perfermance
### How was this patch tested?
Pass CI and test perfermance local.
Closes #35722 from ulysses-you/SPARK-38406.
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/execution/adaptive/ShufflePartitionsUtil.scala | 9 +++++++--
.../spark/sql/execution/ShufflePartitionsUtilSuite.scala | 16 ++++++++--------
2 files changed, 15 insertions(+), 10 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 0251f80..af689db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -317,7 +317,7 @@ object ShufflePartitionsUtil extends Logging {
*/
// Visible for testing
private[sql] def splitSizeListByTargetSize(
- sizes: Seq[Long],
+ sizes: Array[Long],
targetSize: Long,
smallPartitionFactor: Double): Array[Int] = {
val partitionStartIndices = ArrayBuffer[Int]()
@@ -394,7 +394,12 @@ object ShufflePartitionsUtil extends Logging {
} else {
mapStartIndices(i + 1)
}
- val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum
+ var dataSize = 0L
+ var mapIndex = startMapIndex
+ while (mapIndex < endMapIndex) {
+ dataSize += mapPartitionSizes(mapIndex)
+ mapIndex += 1
+ }
PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize)
})
} else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
index 9985665..da05373 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
@@ -705,50 +705,50 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite with LocalSparkContext {
val smallPartitionFactor1 = ShufflePartitionsUtil.SMALL_PARTITION_FACTOR
// merge the small partitions at the beginning/end
- val sizeList1 = Seq[Long](15, 90, 15, 15, 15, 90, 15)
+ val sizeList1 = Array[Long](15, 90, 15, 15, 15, 90, 15)
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
sizeList1, targetSize, smallPartitionFactor1).toSeq ==
Seq(0, 2, 5))
// merge the small partitions in the middle
- val sizeList2 = Seq[Long](30, 15, 90, 10, 90, 15, 30)
+ val sizeList2 = Array[Long](30, 15, 90, 10, 90, 15, 30)
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
sizeList2, targetSize, smallPartitionFactor1).toSeq ==
Seq(0, 2, 4, 5))
// merge small partitions if the partition itself is smaller than
// targetSize * SMALL_PARTITION_FACTOR
- val sizeList3 = Seq[Long](15, 1000, 15, 1000)
+ val sizeList3 = Array[Long](15, 1000, 15, 1000)
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
sizeList3, targetSize, smallPartitionFactor1).toSeq ==
Seq(0, 3))
// merge small partitions if the combined size is smaller than
// targetSize * MERGED_PARTITION_FACTOR
- val sizeList4 = Seq[Long](35, 75, 90, 20, 35, 25, 35)
+ val sizeList4 = Array[Long](35, 75, 90, 20, 35, 25, 35)
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
sizeList4, targetSize, smallPartitionFactor1).toSeq ==
Seq(0, 2, 3))
val smallPartitionFactor2 = 0.5
// merge last two partition if their size is not bigger than smallPartitionFactor * target
- val sizeList5 = Seq[Long](50, 50, 40, 5)
+ val sizeList5 = Array[Long](50, 50, 40, 5)
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
sizeList5, targetSize, smallPartitionFactor2).toSeq ==
Seq(0))
- val sizeList6 = Seq[Long](40, 5, 50, 45)
+ val sizeList6 = Array[Long](40, 5, 50, 45)
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
sizeList6, targetSize, smallPartitionFactor2).toSeq ==
Seq(0))
// do not merge
- val sizeList7 = Seq[Long](50, 50, 10, 40, 5)
+ val sizeList7 = Array[Long](50, 50, 10, 40, 5)
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
sizeList7, targetSize, smallPartitionFactor2).toSeq ==
Seq(0, 2))
- val sizeList8 = Seq[Long](10, 40, 5, 50, 50)
+ val sizeList8 = Array[Long](10, 40, 5, 50, 50)
assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
sizeList8, targetSize, smallPartitionFactor2).toSeq ==
Seq(0, 3))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org