You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/20 11:28:18 UTC
[spark] branch master updated: [SPARK-42779][SQL][FOLLOWUP] Allow V2 writes to indicate advisory shuffle partition size
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2fece076a30 [SPARK-42779][SQL][FOLLOWUP] Allow V2 writes to indicate advisory shuffle partition size
2fece076a30 is described below
commit 2fece076a30566bc152152fef587f5c1b4fca980
Author: aokolnychyi <ao...@apple.com>
AuthorDate: Mon Mar 20 19:27:56 2023 +0800
[SPARK-42779][SQL][FOLLOWUP] Allow V2 writes to indicate advisory shuffle partition size
### What changes were proposed in this pull request?
This PR addresses non-blocking comments for PR #40421.
### Why are the changes needed?
These changes are needed to make sure the new logic only applies in expected cases.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
Closes #40478 from aokolnychyi/spark-42779-followup.
Authored-by: aokolnychyi <ao...@apple.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/execution/adaptive/CoalesceShufflePartitions.scala | 14 +++++++++-----
.../spark/sql/execution/adaptive/QueryStageExec.scala | 2 +-
2 files changed, 10 insertions(+), 6 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 6cca562b6ab..34399001c72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -122,12 +122,16 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
}
}
+ // data sources may request a particular advisory partition size for the final write stage
+ // if it happens, the advisory partition size will be set in ShuffleQueryStageExec
+ // only one shuffle stage is expected in such cases
private def advisoryPartitionSize(shuffleStages: Seq[ShuffleStageInfo]): Long = {
- val advisorySizes = shuffleStages.flatMap(_.shuffleStage.advisoryPartitionSize).toSet
- if (advisorySizes.size == 1) {
- advisorySizes.head
- } else {
- conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+ val defaultAdvisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+ shuffleStages match {
+ case Seq(stage) =>
+ stage.shuffleStage.advisoryPartitionSize.getOrElse(defaultAdvisorySize)
+ case _ =>
+ defaultAdvisorySize
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index 97a4bd617e9..a27f783215e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -180,7 +180,7 @@ case class ShuffleQueryStageExec(
throw new IllegalStateException(s"wrong plan for shuffle stage:\n ${plan.treeString}")
}
- @transient val advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize
+ def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize
@transient private lazy val shuffleFuture = shuffle.submitShuffleJob
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org