You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/08/09 09:26:33 UTC

[spark] branch master updated: [SPARK-36430][SQL] Adaptively calculate the target size when coalescing shuffle partitions in AQE

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a539d5  [SPARK-36430][SQL] Adaptively calculate the target size when coalescing shuffle partitions in AQE
9a539d5 is described below

commit 9a539d5846814f5fd5317b9d0b7eb1a41299f092
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Mon Aug 9 17:25:55 2021 +0800

    [SPARK-36430][SQL] Adaptively calculate the target size when coalescing shuffle partitions in AQE
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a performance regression introduced in https://github.com/apache/spark/pull/33172
    
    Before #33172 , the target size is adaptively calculated based on the default parallelism of the spark cluster. Sometimes it's very small and #33172 sets a min partition size to fix perf issues. Sometimes the calculated size is reasonable, such as dozens of MBs.
    
    After #33172 , we no longer calculate the target size adaptively, and by default always coalesce the partitions into 1 MB. This can cause perf regression if the adaptively calculated size is reasonable.
    
    This PR brings back the code that adaptively calculate the target size based on the default parallelism of the spark cluster.
    
    ### Why are the changes needed?
    
    fix perf regression
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #33655 from cloud-fan/minor.
    
    Lead-authored-by: Wenchen Fan <we...@databricks.com>
    Co-authored-by: Wenchen Fan <cl...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    | 21 +++++-----
 .../adaptive/CoalesceShufflePartitions.scala       | 46 +++++++++++++---------
 .../execution/adaptive/ShufflePartitionsUtil.scala | 10 ++---
 .../adaptive/AdaptiveQueryExecSuite.scala          |  3 +-
 .../apache/spark/sql/internal/SQLConfSuite.scala   |  6 ++-
 5 files changed, 46 insertions(+), 40 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f85f745..a930f63 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -480,6 +480,7 @@ object SQLConf {
       .doc("(Deprecated since Spark 3.0)")
       .version("1.6.0")
       .bytesConf(ByteUnit.BYTE)
+      .checkValue(_ > 0, "advisoryPartitionSizeInBytes must be positive")
       .createWithDefaultString("64MB")
 
   val ADAPTIVE_EXECUTION_ENABLED = buildConf("spark.sql.adaptive.enabled")
@@ -526,28 +527,26 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
-  private val MIN_PARTITION_SIZE_KEY = "spark.sql.adaptive.coalescePartitions.minPartitionSize"
-
   val COALESCE_PARTITIONS_PARALLELISM_FIRST =
     buildConf("spark.sql.adaptive.coalescePartitions.parallelismFirst")
-      .doc("When true, Spark ignores the target size specified by " +
+      .doc("When true, Spark does not respect the target size specified by " +
         s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}' (default 64MB) when coalescing contiguous " +
-        "shuffle partitions, and only respect the minimum partition size specified by " +
-        s"'$MIN_PARTITION_SIZE_KEY' (default 1MB), to maximize the parallelism. " +
-        "This is to avoid performance regression when enabling adaptive query execution. " +
-        "It's recommended to set this config to false and respect the target size specified by " +
-        s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'.")
+        "shuffle partitions, but adaptively calculate the target size according to the default " +
+        "parallelism of the Spark cluster. The calculated size is usually smaller than the " +
+        "configured target size. This is to maximize the parallelism and avoid performance " +
+        "regression when enabling adaptive query execution. It's recommended to set this config " +
+        "to false and respect the configured target size.")
       .version("3.2.0")
       .booleanConf
       .createWithDefault(true)
 
   val COALESCE_PARTITIONS_MIN_PARTITION_SIZE =
     buildConf("spark.sql.adaptive.coalescePartitions.minPartitionSize")
-      .doc("The minimum size of shuffle partitions after coalescing. Its value can be at most " +
-        s"20% of '${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'. This is useful when the target size " +
-        "is ignored during partition coalescing, which is the default case.")
+      .doc("The minimum size of shuffle partitions after coalescing. This is useful when the " +
+        "adaptively calculated target size is too small during partition coalescing.")
       .version("3.2.0")
       .bytesConf(ByteUnit.BYTE)
+      .checkValue(_ > 0, "minPartitionSize must be positive")
       .createWithDefaultString("1MB")
 
   val COALESCE_PARTITIONS_MIN_PARTITION_NUM =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 75c53b4..5c14caa 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
 import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan}
 import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, REBALANCE_PARTITIONS_BY_COL, REBALANCE_PARTITIONS_BY_NONE, REPARTITION_BY_COL, ShuffleExchangeLike, ShuffleOrigin}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.Utils
 
 /**
  * A rule to coalesce the shuffle partitions based on the map output statistics, which can
@@ -59,33 +60,40 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
     if (!shuffleStageInfos.forall(s => isSupported(s.shuffleStage.shuffle))) {
       plan
     } else {
-      // Ideally, this rule should simply coalesce partition w.r.t. the target size specified by
+      // Ideally, this rule should simply coalesce partitions w.r.t. the target size specified by
       // ADVISORY_PARTITION_SIZE_IN_BYTES (default 64MB). To avoid perf regression in AQE, this
-      // rule by default ignores the target size (set it to 0), and only respect the minimum
-      // partition size specified by COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB).
+      // rule by default tries to maximize the parallelism and set the target size to
+      // `total shuffle size / Spark default parallelism`. In case the `Spark default parallelism`
+      // is too big, this rule also respect the minimum partition size specified by
+      // COALESCE_PARTITIONS_MIN_PARTITION_SIZE (default 1MB).
       // For history reason, this rule also need to support the config
-      // COALESCE_PARTITIONS_MIN_PARTITION_NUM: if it's set, we will respect both the target
-      // size and minimum partition number, no matter COALESCE_PARTITIONS_PARALLELISM_FIRST is true
-      // or false.
-      // TODO: remove the `minNumPartitions` parameter from
-      //       `ShufflePartitionsUtil.coalescePartitions` after we remove the config
-      //       COALESCE_PARTITIONS_MIN_PARTITION_NUM
-      val minPartitionNum = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM)
-      val advisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
-      // `minPartitionSize` can be at most 20% of `advisorySize`.
-      val minPartitionSize = math.min(
-        advisorySize / 5, conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE))
-      val parallelismFirst = conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)
-      val advisoryTargetSize = if (minPartitionNum.isEmpty && parallelismFirst) {
-        0
+      // COALESCE_PARTITIONS_MIN_PARTITION_NUM. We should remove this config in the future.
+      val minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse {
+        if (conf.getConf(SQLConf.COALESCE_PARTITIONS_PARALLELISM_FIRST)) {
+          // We fall back to Spark default parallelism if the minimum number of coalesced partitions
+          // is not set, so to avoid perf regressions compared to no coalescing.
+          session.sparkContext.defaultParallelism
+        } else {
+          // If we don't need to maximize the parallelism, we set `minPartitionNum` to 1, so that
+          // the specified advisory partition size will be respected.
+          1
+        }
+      }
+      val advisoryTargetSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+      val minPartitionSize = if (Utils.isTesting) {
+        // In the tests, we usually set the target size to a very small value that is even smaller
+        // than the default value of the min partition size. Here we also adjust the min partition
+        // size to be not larger than 20% of the target size, so that the tests don't need to set
+        // both configs all the time to check the coalescing behavior.
+        conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE).min(advisoryTargetSize / 5)
       } else {
-        advisorySize
+        conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE)
       }
       val newPartitionSpecs = ShufflePartitionsUtil.coalescePartitions(
         shuffleStageInfos.map(_.shuffleStage.mapStats),
         shuffleStageInfos.map(_.partitionSpecs),
         advisoryTargetSize = advisoryTargetSize,
-        minNumPartitions = conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM).getOrElse(1),
+        minNumPartitions = minNumPartitions,
         minPartitionSize = minPartitionSize)
 
       if (newPartitionSpecs.nonEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
index 64f89b9..3609548 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala
@@ -56,13 +56,9 @@ object ShufflePartitionsUtil extends Logging {
     // If `minNumPartitions` is very large, it is possible that we need to use a value less than
     // `advisoryTargetSize` as the target size of a coalesced task.
     val totalPostShuffleInputSize = mapOutputStatistics.flatMap(_.map(_.bytesByPartitionId.sum)).sum
-    // The max at here is to make sure that when we have an empty table, we only have a single
-    // coalesced partition.
-    // There is no particular reason that we pick 16. We just need a number to prevent
-    // `maxTargetSize` from being set to 0.
-    val maxTargetSize = math.max(
-      math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong, 16)
-    val targetSize = math.min(maxTargetSize, advisoryTargetSize)
+    val maxTargetSize = math.ceil(totalPostShuffleInputSize / minNumPartitions.toDouble).toLong
+    // It's meaningless to make target size smaller than minPartitionSize.
+    val targetSize = maxTargetSize.min(advisoryTargetSize).max(minPartitionSize)
 
     val shuffleIds = mapOutputStatistics.flatMap(_.map(_.shuffleId)).mkString(", ")
     logInfo(s"For shuffle($shuffleIds), advisory target size: $advisoryTargetSize, " +
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index d38a641..4471fda 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1691,7 +1691,8 @@ class AdaptiveQueryExecSuite
 
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
       SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
-      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2258",
+      // Pick a small value so that no coalesce can happen.
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
       SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_NUM.key -> "1",
       SQLConf.SHUFFLE_PARTITIONS.key -> "2") {
       val df = spark.sparkContext.parallelize(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
index 6f2452a..5e661a0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala
@@ -259,8 +259,10 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {
     spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "1g")
     assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === 1073741824)
 
-    spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1")
-    assert(spark.conf.get(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES) === -1)
+    // test negative value
+    intercept[IllegalArgumentException] {
+      spark.conf.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "-1")
+    }
 
     // Test overflow exception
     intercept[IllegalArgumentException] {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org