You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/06/09 16:09:45 UTC
[spark] branch branch-3.0 updated: [SPARK-31220][SQL] repartition
obeys initialPartitionNum when adaptiveExecutionEnabled
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b9a1cd8d [SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled
b9a1cd8d is described below
commit b9a1cd8d4bb1241e27fd78c04a892c10f1a62147
Author: Yuming Wang <yu...@ebay.com>
AuthorDate: Tue Jun 9 16:07:22 2020 +0000
[SPARK-31220][SQL] repartition obeys initialPartitionNum when adaptiveExecutionEnabled
### What changes were proposed in this pull request?
This PR makes `repartition`/`DISTRIBUTE BY` obeys [initialPartitionNum](https://github.com/apache/spark/blob/af4248b2d661d04fec89b37857a47713246d9465/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L446-L455) when adaptive execution enabled.
### Why are the changes needed?
To make `DISTRIBUTE BY`/`GROUP BY` partitioned by same partition number.
How to reproduce:
```scala
spark.sql("CREATE TABLE spark_31220(id int)")
spark.sql("set spark.sql.adaptive.enabled=true")
spark.sql("set spark.sql.adaptive.coalescePartitions.initialPartitionNum=1000")
```
Before this PR:
```
scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
+- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
+- HashAggregate(keys=[id#5], functions=[])
+- FileScan parquet default.spark_31220[id#5]
scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 200), false, [id=#179]
+- FileScan parquet default.spark_31220[id#5]
```
After this PR:
```
scala> spark.sql("SELECT id from spark_31220 GROUP BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- HashAggregate(keys=[id#5], functions=[])
+- Exchange hashpartitioning(id#5, 1000), true, [id=#171]
+- HashAggregate(keys=[id#5], functions=[])
+- FileScan parquet default.spark_31220[id#5]
scala> spark.sql("SELECT id from spark_31220 DISTRIBUTE BY id").explain
== Physical Plan ==
AdaptiveSparkPlan(isFinalPlan=false)
+- Exchange hashpartitioning(id#5, 1000), false, [id=#179]
+- FileScan parquet default.spark_31220[id#5]
```
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Unit test.
Closes #27986 from wangyum/SPARK-31220.
Authored-by: Yuming Wang <yu...@ebay.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 1d1eacde9d1b6fb75a20e4b909d221e70ad737db)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../scala/org/apache/spark/sql/internal/SQLConf.scala | 13 +++++++++----
.../sql/execution/exchange/EnsureRequirements.scala | 10 ++--------
.../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 16 ++++++++++++++++
3 files changed, 27 insertions(+), 12 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index b8b0f32..fd57095 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2765,7 +2765,15 @@ class SQLConf extends Serializable with Logging {
def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)
- def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
+ def defaultNumShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
+
+ def numShufflePartitions: Int = {
+ if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {
+ getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(defaultNumShufflePartitions)
+ } else {
+ defaultNumShufflePartitions
+ }
+ }
def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)
@@ -2778,9 +2786,6 @@ class SQLConf extends Serializable with Logging {
def coalesceShufflePartitionsEnabled: Boolean = getConf(COALESCE_PARTITIONS_ENABLED)
- def initialShufflePartitionNum: Int =
- getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions)
-
def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)
def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 28ef793..3242ac2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -35,12 +35,6 @@ import org.apache.spark.sql.internal.SQLConf
* the input partition ordering requirements are met.
*/
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
- private def defaultNumPreShufflePartitions: Int =
- if (conf.adaptiveExecutionEnabled && conf.coalesceShufflePartitionsEnabled) {
- conf.initialShufflePartitionNum
- } else {
- conf.numShufflePartitions
- }
private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
@@ -57,7 +51,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
BroadcastExchangeExec(mode, child)
case (child, distribution) =>
val numPartitions = distribution.requiredNumPartitions
- .getOrElse(defaultNumPreShufflePartitions)
+ .getOrElse(conf.numShufflePartitions)
ShuffleExchangeExec(distribution.createPartitioning(numPartitions), child)
}
@@ -95,7 +89,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
// expected number of shuffle partitions. However, if it's smaller than
// `conf.numShufflePartitions`, we pick `conf.numShufflePartitions` as the
// expected number of shuffle partitions.
- math.max(nonShuffleChildrenNumPartitions.max, conf.numShufflePartitions)
+ math.max(nonShuffleChildrenNumPartitions.max, conf.defaultNumShufflePartitions)
} else {
childrenNumPartitions.max
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index ac0267a..6477ddc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -873,4 +873,20 @@ class AdaptiveQueryExecSuite
}
}
}
+
+ test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") {
+ Seq(true, false).foreach { enableAQE =>
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,
+ SQLConf.SHUFFLE_PARTITIONS.key -> "6",
+ SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
+ val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length
+ if (enableAQE) {
+ assert(partitionsNum === 7)
+ } else {
+ assert(partitionsNum === 6)
+ }
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org