You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/06/09 16:09:45 UTC

[spark] branch branch-3.0 updated: [SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b9a1cd8d [SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled
b9a1cd8d is described below

commit b9a1cd8d4bb1241e27fd78c04a892c10f1a62147
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Tue Jun 9 16:07:22 2020 +0000

    [SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled
    
    ### What changes were proposed in this pull request?
    This PR makes `repartition`/`DISTRIBUTE BY` obeys [initialPartitionNum](https://github.com/apache/spark/blob/af4248b2d661d04fec89b37857a47713246d9465/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L446-L455) when adaptive execution enabled.
    
    ### Why are the changes needed?
    To make `DISTRIBUTE BY`/`GROUP BY` partitioned by same partition number.
    How to reproduce:
    ```scala
    spark.sql("CREATE TABLE spark_31220(id int)")
    spark.sql("set spark.sql.adaptive.enabled=true")
    spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000")
    ```
    
    Before this PR:
    ```
    scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
    == Physical Plan ==
    AdaptiveSparkPlan(isFinalPlan=false)
    +- HashAggregate(keys=[id#5], functions=[])
       +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
          +- HashAggregate(keys=[id#5], functions=[])
             +- FileScan parquet default.spark_31220[id#5]
    
    scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
    == Physical Plan ==
    AdaptiveSparkPlan(isFinalPlan=false)
    +- Exchange hashpartitioning(id#5, 200), false, [id=#179]
       +- FileScan parquet default.spark_31220[id#5]
    ```
    After this PR:
    ```
    scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
    == Physical Plan ==
    AdaptiveSparkPlan(isFinalPlan=false)
    +- HashAggregate(keys=[id#5], functions=[])
       +- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
          +- HashAggregate(keys=[id#5], functions=[])
             +- FileScan parquet default.spark_31220[id#5]
    
    scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
    == Physical Plan ==
    AdaptiveSparkPlan(isFinalPlan=false)
    +- Exchange hashpartitioning(id#5, 1000), false, [id=#179]
       +- FileScan parquet default.spark_31220[id#5]
    ```
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Unit test.
    
    Closes #27986 from wangyum/SPARK-31220.
    
    Authored-by: Yuming Wang <yu...@ebay.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 1d1eacde9d1b6fb75a20e4b909d221e70ad737db)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../scala/org/apache/spark/sql/internal/SQLConf.scala    | 13 +++++++++----
 .../sql/execution/exchange/EnsureRequirements.scala      | 10 ++--------
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala  | 16 ++++++++++++++++
 3 files changed, 27 insertions(+), 12 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b8b0f32..fd57095 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2765,7 +2765,15 @@ class SQLConf extends Serializable with Logging {
 
   def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)
 
-  def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
+  def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
+
+  def numShufflePartitions: Int = {
+    if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {
+      getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)
+    } else {
+      defaultNumShufflePartitions
+    }
+  }
 
   def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
 
@@ -2778,9 +2786,6 @@ class SQLConf extends Serializable with Logging {
 
   def coalesceShufflePartitionsEnabled: Boolean = getConf(COALESCE_PARTITIONS_ENABLED)
 
-  def initialShufflePartitionNum: Int =
-    getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions)
-
   def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
 
   def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 28ef793..3242ac2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -35,12 +35,6 @@ import org.apache.spark.sql.internal.SQLConf
  * the input partition ordering requirements are met.
  */
 case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
-  private def defaultNumPreShufflePartitions: Int =
-    if (conf.adaptiveExecutionEnabled && conf.coalesceShufflePartitionsEnabled) {
-      conf.initialShufflePartitionNum
-    } else {
-      conf.numShufflePartitions
-    }
 
   private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
     val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
@@ -57,7 +51,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
         BroadcastExchangeExec(mode, child)
       case (child, distribution) =>
         val numPartitions = distribution.requiredNumPartitions
-          .getOrElse(defaultNumPreShufflePartitions)
+          .getOrElse(conf.numShufflePartitions)
         ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
     }
 
@@ -95,7 +89,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
         // expected number of shuffle partitions. However, if it's smaller than
         // `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the
         // expected number of shuffle partitions.
-        math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions)
+        math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions)
       } else {
         childrenNumPartitions.max
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index ac0267a..6477ddc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -873,4 +873,20 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") {
+    Seq(true, false).foreach { enableAQE =>
+      withSQLConf(
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,
+        SQLConf.SHUFFLE_PARTITIONS.key -> "6",
+        SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
+        val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length
+        if (enableAQE) {
+          assert(partitionsNum === 7)
+        } else {
+          assert(partitionsNum === 6)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org