You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/03/08 08:53:33 UTC

[spark] branch master updated: [SPARK-38406][SQL] Improve perfermance of ShufflePartitionsUtil createSkewPartitionSpecs

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e1d00c  [SPARK-38406][SQL] Improve perfermance of ShufflePartitionsUtil createSkewPartitionSpecs
9e1d00c is described below

commit 9e1d00c521964a6bbf7e0126fd6dcf0020509420
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Mar 8 16:51:39 2022 +0800

    [SPARK-38406][SQL] Improve perfermance of ShufflePartitionsUtil createSkewPartitionSpecs
    
    ### What changes were proposed in this pull request?
    
    Avoid unnecessary scala syntactic sugar.
    
    ### Why are the changes needed?
    
    If shuffle is skewed with tens of thousands of map partitions and reduce partitions in AQE, the method `ShufflePartitionsUtil#createSkewPartitionSpecs` will be very slow. More unfortunately, it is running at driver side.
    
    I test with local env using 50,000 maps and 10,000 reduces. We can see the cpu time using build seq. See the Flame Graph:
    
    ![image](https://user-images.githubusercontent.com/12025282/156567065-6d9bffe9-3ab3-469d-92e8-da9687a8e5a8.png)
    
    And the perfermance number:
    
    - before: 47,336 ms
    - aflter: 9,274 ms
    
    ### Does this PR introduce _any_ user-facing change?
    
    no, only improve perfermance
    
    ### How was this patch tested?
    
    Pass CI and test perfermance local.
    
    Closes #35722 from ulysses-you/SPARK-38406.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/adaptive/ShufflePartitionsUtil.scala   |  9 +++++++--
 .../spark/sql/execution/ShufflePartitionsUtilSuite.scala | 16 ++++++++--------
 2 files changed, 15 insertions(+), 10 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 0251f80..af689db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -317,7 +317,7 @@ object ShufflePartitionsUtil extends Logging {
    */
   // Visible for testing
   private[sql] def splitSizeListByTargetSize(
-      sizes: Seq[Long],
+      sizes: Array[Long],
       targetSize: Long,
       smallPartitionFactor: Double): Array[Int] = {
     val partitionStartIndices = ArrayBuffer[Int]()
@@ -394,7 +394,12 @@ object ShufflePartitionsUtil extends Logging {
         } else {
           mapStartIndices(i + 1)
         }
-        val dataSize = startMapIndex.until(endMapIndex).map(mapPartitionSizes(_)).sum
+        var dataSize = 0L
+        var mapIndex = startMapIndex
+        while (mapIndex < endMapIndex) {
+          dataSize += mapPartitionSizes(mapIndex)
+          mapIndex += 1
+        }
         PartialReducerPartitionSpec(reducerId, startMapIndex, endMapIndex, dataSize)
       })
     } else {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
index 9985665..da05373 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala
@@ -705,50 +705,50 @@ class ShufflePartitionsUtilSuite extends SparkFunSuite with LocalSparkContext {
 
     val smallPartitionFactor1 = ShufflePartitionsUtil.SMALL_PARTITION_FACTOR
     // merge the small partitions at the beginning/end
-    val sizeList1 = Seq[Long](15, 90, 15, 15, 15, 90, 15)
+    val sizeList1 = Array[Long](15, 90, 15, 15, 15, 90, 15)
     assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
       sizeList1, targetSize, smallPartitionFactor1).toSeq ==
       Seq(0, 2, 5))
 
     // merge the small partitions in the middle
-    val sizeList2 = Seq[Long](30, 15, 90, 10, 90, 15, 30)
+    val sizeList2 = Array[Long](30, 15, 90, 10, 90, 15, 30)
     assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
       sizeList2, targetSize, smallPartitionFactor1).toSeq ==
       Seq(0, 2, 4, 5))
 
     // merge small partitions if the partition itself is smaller than
     // targetSize * SMALL_PARTITION_FACTOR
-    val sizeList3 = Seq[Long](15, 1000, 15, 1000)
+    val sizeList3 = Array[Long](15, 1000, 15, 1000)
     assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
       sizeList3, targetSize, smallPartitionFactor1).toSeq ==
       Seq(0, 3))
 
     // merge small partitions if the combined size is smaller than
     // targetSize * MERGED_PARTITION_FACTOR
-    val sizeList4 = Seq[Long](35, 75, 90, 20, 35, 25, 35)
+    val sizeList4 = Array[Long](35, 75, 90, 20, 35, 25, 35)
     assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
       sizeList4, targetSize, smallPartitionFactor1).toSeq ==
       Seq(0, 2, 3))
 
     val smallPartitionFactor2 = 0.5
     // merge last two partition if their size is not bigger than smallPartitionFactor * target
-    val sizeList5 = Seq[Long](50, 50, 40, 5)
+    val sizeList5 = Array[Long](50, 50, 40, 5)
     assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
       sizeList5, targetSize, smallPartitionFactor2).toSeq ==
       Seq(0))
 
-    val sizeList6 = Seq[Long](40, 5, 50, 45)
+    val sizeList6 = Array[Long](40, 5, 50, 45)
     assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
       sizeList6, targetSize, smallPartitionFactor2).toSeq ==
       Seq(0))
 
     // do not merge
-    val sizeList7 = Seq[Long](50, 50, 10, 40, 5)
+    val sizeList7 = Array[Long](50, 50, 10, 40, 5)
     assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
       sizeList7, targetSize, smallPartitionFactor2).toSeq ==
       Seq(0, 2))
 
-    val sizeList8 = Seq[Long](10, 40, 5, 50, 50)
+    val sizeList8 = Array[Long](10, 40, 5, 50, 50)
     assert(ShufflePartitionsUtil.splitSizeListByTargetSize(
       sizeList8, targetSize, smallPartitionFactor2).toSeq ==
       Seq(0, 3))

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org