You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2020/02/25 22:12:44 UTC

[spark] branch branch-3.0 updated: [SPARK-30918][SQL] improve the splitting of skewed partitions

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b968cd3  [SPARK-30918][SQL] improve the splitting of skewed partitions
b968cd3 is described below

commit b968cd37796a5730fe5c2318d23a38416f550957
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Tue Feb 25 14:10:29 2020 -0800

    [SPARK-30918][SQL] improve the splitting of skewed partitions
    
    ### What changes were proposed in this pull request?
    
    Use the average size of the non-skewed partitions as the target size when splitting skewed partitions, instead of ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD
    
    ### Why are the changes needed?
    
    The goal of skew join optimization is to make the data distribution move even. So it makes more sense the use the average size of the non-skewed partitions as the target size.
    
    ### Does this PR introduce any user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #27669 from cloud-fan/aqe.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Xiao Li <ga...@gmail.com>
    (cherry picked from commit 8f247e5d3682ad765bdbb9ea5a4315862c5a383c)
    Signed-off-by: Xiao Li <ga...@gmail.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 10 +---
 .../execution/adaptive/OptimizeSkewedJoin.scala    | 62 ++++++++++++++++++----
 .../adaptive/AdaptiveQueryExecSuite.scala          |  4 +-
 3 files changed, 54 insertions(+), 22 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 674c6df..e6f7cfd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -432,19 +432,13 @@ object SQLConf {
     .booleanConf
     .createWithDefault(true)
 
-  val ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD =
-    buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionSizeThreshold")
-      .doc("Configures the minimum size in bytes for a partition that is considered as a skewed " +
-        "partition in adaptive skewed join.")
-      .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("64MB")
-
   val ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR =
     buildConf("spark.sql.adaptive.skewedJoinOptimization.skewedPartitionFactor")
       .doc("A partition is considered as a skewed partition if its size is larger than" +
         " this factor multiple the median partition size and also larger than " +
-        s" ${ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key}")
+        s" ${SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key}")
       .intConf
+      .checkValue(_ > 0, "The skew factor must be positive.")
       .createWithDefault(10)
 
   val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
index 578d2d7..d3cb864 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala
@@ -34,6 +34,30 @@ import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ShuffleExcha
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
 import org.apache.spark.sql.internal.SQLConf
 
+/**
+ * A rule to optimize skewed joins to avoid straggler tasks whose share of data are significantly
+ * larger than those of the rest of the tasks.
+ *
+ * The general idea is to divide each skew partition into smaller partitions and replicate its
+ * matching partition on the other side of the join so that they can run in parallel tasks.
+ * Note that when matching partitions from the left side and the right side both have skew,
+ * it will become a cartesian product of splits from left and right joining together.
+ *
+ * For example, assume the Sort-Merge join has 4 partitions:
+ * left:  [L1, L2, L3, L4]
+ * right: [R1, R2, R3, R4]
+ *
+ * Let's say L2, L4 and R3, R4 are skewed, and each of them get split into 2 sub-partitions. This
+ * is scheduled to run 4 tasks at the beginning: (L1, R1), (L2, R2), (L2, R2), (L2, R2).
+ * This rule expands it to 9 tasks to increase parallelism:
+ * (L1, R1),
+ * (L2-1, R2), (L2-2, R2),
+ * (L3, R3-1), (L3, R3-2),
+ * (L4-1, R4-1), (L4-2, R4-1), (L4-1, R4-2), (L4-2, R4-2)
+ *
+ * Note that, when this rule is enabled, it also coalesces non-skewed partitions like
+ * `ReduceNumShufflePartitions` does.
+ */
 case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
 
   private val ensureRequirements = EnsureRequirements(conf)
@@ -43,12 +67,12 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
 
   /**
    * A partition is considered as a skewed partition if its size is larger than the median
-   * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger than
-   * spark.sql.adaptive.skewedPartitionSizeThreshold.
+   * partition size * ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR and also larger than
+   * SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.
    */
   private def isSkewed(size: Long, medianSize: Long): Boolean = {
     size > medianSize * conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) &&
-      size > conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)
+      size > conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
   }
 
   private def medianSize(stats: MapOutputStatistics): Long = {
@@ -62,6 +86,19 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
   }
 
   /**
+   * The goal of skew join optimization is to make the data distribution more even. The target size
+   * to split skewed partitions is the average size of non-skewed partition, or the
+   * target post-shuffle partition size if avg size is smaller than it.
+   */
+  private def targetSize(stats: MapOutputStatistics, medianSize: Long): Long = {
+    val targetPostShuffleSize = conf.getConf(SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE)
+    val nonSkewSizes = stats.bytesByPartitionId.filterNot(isSkewed(_, medianSize))
+    // It's impossible that all the partitions are skewed, as we use median size to define skew.
+    assert(nonSkewSizes.nonEmpty)
+    math.max(targetPostShuffleSize, nonSkewSizes.sum / nonSkewSizes.length)
+  }
+
+  /**
    * Get the map size of the specific reduce shuffle Id.
    */
   private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): Array[Long] = {
@@ -72,19 +109,19 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
   /**
    * Split the skewed partition based on the map size and the max split number.
    */
-  private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: Int): Array[Int] = {
+  private def getMapStartIndices(
+      stage: ShuffleQueryStageExec,
+      partitionId: Int,
+      targetSize: Long): Array[Int] = {
     val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId
     val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId)
-    val avgPartitionSize = mapPartitionSizes.sum / mapPartitionSizes.length
-    val advisoryPartitionSize = math.max(avgPartitionSize,
-      conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD))
     val partitionStartIndices = ArrayBuffer[Int]()
     partitionStartIndices += 0
     var i = 0
     var postMapPartitionSize = 0L
     while (i < mapPartitionSizes.length) {
       val nextMapPartitionSize = mapPartitionSizes(i)
-      if (i > 0 && postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) {
+      if (i > 0 && postMapPartitionSize + nextMapPartitionSize > targetSize) {
         partitionStartIndices += i
         postMapPartitionSize = nextMapPartitionSize
       } else {
@@ -152,6 +189,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
         """.stripMargin)
       val canSplitLeft = canSplitLeftSide(joinType)
       val canSplitRight = canSplitRightSide(joinType)
+      val leftTargetSize = targetSize(leftStats, leftMedSize)
+      val rightTargetSize = targetSize(rightStats, rightMedSize)
 
       val leftSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
       val rightSidePartitions = mutable.ArrayBuffer.empty[ShufflePartitionSpec]
@@ -179,7 +218,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
             leftSkewDesc.addPartitionSize(leftSize)
             createSkewPartitions(
               partitionIndex,
-              getMapStartIndices(left, partitionIndex),
+              getMapStartIndices(left, partitionIndex, leftTargetSize),
               getNumMappers(left))
           } else {
             Seq(SinglePartitionSpec(partitionIndex))
@@ -189,7 +228,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
             rightSkewDesc.addPartitionSize(rightSize)
             createSkewPartitions(
               partitionIndex,
-              getMapStartIndices(right, partitionIndex),
+              getMapStartIndices(right, partitionIndex, rightTargetSize),
               getNumMappers(right))
           } else {
             Seq(SinglePartitionSpec(partitionIndex))
@@ -236,7 +275,8 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
       rightStats: MapOutputStatistics,
       nonSkewPartitionIndices: Seq[Int]): Seq[ShufflePartitionSpec] = {
     assert(nonSkewPartitionIndices.nonEmpty)
-    if (nonSkewPartitionIndices.length == 1) {
+    val shouldCoalesce = conf.getConf(SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED)
+    if (!shouldCoalesce || nonSkewPartitionIndices.length == 1) {
       Seq(SinglePartitionSpec(nonSkewPartitionIndices.head))
     } else {
       val startIndices = ShufflePartitionsCoalescer.coalescePartitions(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 379e17f..64566af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -583,7 +583,6 @@ class AdaptiveQueryExecSuite
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "100",
       SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") {
       withTempView("skewData1", "skewData2") {
         spark
@@ -609,8 +608,7 @@ class AdaptiveQueryExecSuite
     withSQLConf(
       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
-      SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "2000",
-      SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") {
+      SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "2000") {
       withTempView("skewData1", "skewData2") {
         spark
           .range(0, 1000, 1, 10)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org