You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/20 12:50:20 UTC

[spark] branch branch-3.2 updated: [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 677104f  [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition
677104f is described below

commit 677104f49531a5f5c214729b4d3e0ce91b4f4a64
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Tue Jul 20 20:48:35 2021 +0800

    [SPARK-36221][SQL] Make sure CustomShuffleReaderExec has at least one partition
    
    ### What changes were proposed in this pull request?
    
    * Add non-empty partition check in `CustomShuffleReaderExec`
    * Make sure `OptimizeLocalShuffleReader` doesn't return empty partition
    
    ### Why are the changes needed?
    
    Since SPARK-32083, AQE coalesce always return at least one partition, it should be robust to add non-empty check in `CustomShuffleReaderExec`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    not need
    
    Closes #33431 from ulysses-you/non-empty-partition.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit b70c25881c8cbac299991a62457ad4373a11cfe4)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/adaptive/CustomShuffleReaderExec.scala     | 10 ++++++----
 .../sql/execution/adaptive/OptimizeLocalShuffleReader.scala  | 12 +++++-------
 2 files changed, 11 insertions(+), 11 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
index cea3016..8975054 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala
@@ -34,11 +34,14 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
  *
  * @param child           It is usually `ShuffleQueryStageExec`, but can be the shuffle exchange
  *                        node during canonicalization.
- * @param partitionSpecs  The partition specs that defines the arrangement.
+ * @param partitionSpecs  The partition specs that defines the arrangement, requires at least one
+ *                        partition.
  */
 case class CustomShuffleReaderExec private(
     child: SparkPlan,
     partitionSpecs: Seq[ShufflePartitionSpec]) extends UnaryExecNode {
+  assert(partitionSpecs.nonEmpty, "CustomShuffleReaderExec requires at least one partition")
+
   // If this reader is to read shuffle files locally, then all partition specs should be
   // `PartialMapperPartitionSpec`.
   if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) {
@@ -52,8 +55,7 @@ case class CustomShuffleReaderExec private(
     // If it is a local shuffle reader with one mapper per task, then the output partitioning is
     // the same as the plan before shuffle.
     // TODO this check is based on assumptions of callers' behavior but is sufficient for now.
-    if (partitionSpecs.nonEmpty &&
-        partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
+    if (partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) &&
         partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size ==
           partitionSpecs.length) {
       child match {
@@ -111,7 +113,7 @@ case class CustomShuffleReaderExec private(
   }
 
   @transient private lazy val partitionDataSizes: Option[Seq[Long]] = {
-    if (partitionSpecs.nonEmpty && !isLocalReader && shuffleStage.get.mapStats.isDefined) {
+    if (!isLocalReader && shuffleStage.get.mapStats.isDefined) {
       Some(partitionSpecs.map {
         case p: CoalescedPartitionSpec =>
           assert(p.dataSize.isDefined)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
index c91b999..2103145 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
@@ -68,13 +68,11 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule {
       shuffleStage: ShuffleQueryStageExec,
       advisoryParallelism: Option[Int]): Seq[ShufflePartitionSpec] = {
     val numMappers = shuffleStage.shuffle.numMappers
+    // ShuffleQueryStageExec.mapStats.isDefined promise numMappers > 0
+    assert(numMappers > 0)
     val numReducers = shuffleStage.shuffle.numPartitions
     val expectedParallelism = advisoryParallelism.getOrElse(numReducers)
-    val splitPoints = if (numMappers == 0) {
-      Seq.empty
-    } else {
-      equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
-    }
+    val splitPoints = equallyDivide(numReducers, math.max(1, expectedParallelism / numMappers))
     (0 until numMappers).flatMap { mapIndex =>
       (splitPoints :+ numReducers).sliding(2).map {
         case Seq(start, end) => PartialMapperPartitionSpec(mapIndex, start, end)
@@ -127,8 +125,8 @@ object OptimizeLocalShuffleReader extends CustomShuffleReaderRule {
   def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
     case s: ShuffleQueryStageExec =>
       s.mapStats.isDefined && supportLocalReader(s.shuffle)
-    case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
-      s.mapStats.isDefined && partitionSpecs.nonEmpty && supportLocalReader(s.shuffle) &&
+    case CustomShuffleReaderExec(s: ShuffleQueryStageExec, _) =>
+      s.mapStats.isDefined && supportLocalReader(s.shuffle) &&
         s.shuffle.shuffleOrigin == ENSURE_REQUIREMENTS
     case _ => false
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org