You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/20 12:50:20 UTC
[spark] branch branch-3.2 updated: [SPARK-36221][SQL] Make sure
CustomShuffleReaderExec has at least one partition
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 677104f [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition
677104f is described below
commit 677104f49531a5f5c214729b4d3e0ce91b4f4a64
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Jul 20 20:48:35 2021 +0800
[SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition
### What changes were proposed in this pull request?
* Add non-empty partition check in `CustomShuffleReaderExec`
* Make sure `OptimizeLocalShuffleReader` doesn't return empty partition
### Why are the changes needed?
Since SPARK-32083, AQE coalesce always return at least one partition, it should be robust to add non-empty check in `CustomShuffleReaderExec`.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
not need
Closes #33431 from ulysses-you/non-empty-partition.
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit b70c25881c8cbac299991a62457ad4373a11cfe4)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/execution/adaptive/CustomShuffleReaderExec.scala | 10 ++++++----
.../sql/execution/adaptive/OptimizeLocalShuffleReader.scala | 12 +++++-------
2 files changed, 11 insertions(+), 11 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index cea3016..8975054 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -34,11 +34,14 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
*
* @param child It is usually `ShuffleQueryStageExec`, but can be the shuffle exchange
* node during canonicalization.
- * @param partitionSpecs The partition specs that defines the arrangement.
+ * @param partitionSpecs The partition specs that defines the arrangement, requires at least one
+ * partition.
*/
case class CustomShuffleReaderExec private(
child: SparkPlan,
partitionSpecs: Seq[ShufflePartitionSpec]) extends UnaryExecNode {
+ assert(partitionSpecs.nonEmpty, "CustomShuffleReaderExec requires at least one partition")
+
// If this reader is to read shuffle files locally, then all partition specs should be
// `PartialMapperPartitionSpec`.
if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
@@ -52,8 +55,7 @@ case class CustomShuffleReaderExec private(
// If it is a local shuffle reader with one mapper per task, then the output partitioning is
// the same as the plan before shuffle.
// TODO this check is based on assumptions of callers' behavior but is sufficient for now.
- if (partitionSpecs.nonEmpty &&
- partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
+ if (partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size ==
partitionSpecs.length) {
child match {
@@ -111,7 +113,7 @@ case class CustomShuffleReaderExec private(
}
@transient private lazy val partitionDataSizes: Option[Seq[Long]] = {
- if (partitionSpecs.nonEmpty && !isLocalReader && shuffleStage.get.mapStats.isDefined) {
+ if (!isLocalReader && shuffleStage.get.mapStats.isDefined) {
Some(partitionSpecs.map {
case p: CoalescedPartitionSpec =>
assert(p.dataSize.isDefined)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
index c91b999..2103145 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
@@ -68,13 +68,11 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule {
shuffleStage: ShuffleQueryStageExec,
advisoryParallelism: Option[Int]): Seq[ShufflePartitionSpec] = {
val numMappers = shuffleStage.shuffle.numMappers
+ // ShuffleQueryStageExec.mapStats.isDefined promise numMappers > 0
+ assert(numMappers > 0)
val numReducers = shuffleStage.shuffle.numPartitions
val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
- val splitPoints = if (numMappers == 0) {
- Seq.empty
- } else {
- equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
- }
+ val splitPoints = equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
(0 until numMappers).flatMap { mapIndex =>
(splitPoints :+ numReducers).sliding(2).map {
case Seq(start, end) => PartialMapperPartitionSpec(mapIndex, start, end)
@@ -127,8 +125,8 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule {
def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
case s: ShuffleQueryStageExec =>
s.mapStats.isDefined && supportLocalReader(s.shuffle)
- case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
- s.mapStats.isDefined && partitionSpecs.nonEmpty && supportLocalReader(s.shuffle) &&
+ case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
+ s.mapStats.isDefined && supportLocalReader(s.shuffle) &&
s.shuffle.shuffleOrigin == ENSURE_REQUIREMENTS
case _ => false
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org