You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/03/20 11:28:18 UTC

[spark] branch master updated: [SPARK-42779][SQL][FOLLOWUP] Allow V2 writes to indicate advisory shuffle partition size

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fece076a30 [SPARK-42779][SQL][FOLLOWUP] Allow V2 writes to indicate advisory shuffle partition size
2fece076a30 is described below

commit 2fece076a30566bc152152fef587f5c1b4fca980
Author: aokolnychyi <ao...@apple.com>
AuthorDate: Mon Mar 20 19:27:56 2023 +0800

    [SPARK-42779][SQL][FOLLOWUP] Allow V2 writes to indicate advisory shuffle partition size
    
    ### What changes were proposed in this pull request?
    
    This PR addresses non-blocking comments for PR #40421.
    
    ### Why are the changes needed?
    
    These changes are needed to make sure the new logic only applies in expected cases.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
    
    Closes #40478 from aokolnychyi/spark-42779-followup.
    
    Authored-by: aokolnychyi <ao...@apple.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/adaptive/CoalesceShufflePartitions.scala | 14 +++++++++-----
 .../spark/sql/execution/adaptive/QueryStageExec.scala      |  2 +-
 2 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
index 6cca562b6ab..34399001c72 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala
@@ -122,12 +122,16 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
     }
   }
 
+  // data sources may request a particular advisory partition size for the final write stage
+  // if it happens, the advisory partition size will be set in ShuffleQueryStageExec
+  // only one shuffle stage is expected in such cases
   private def advisoryPartitionSize(shuffleStages: Seq[ShuffleStageInfo]): Long = {
-    val advisorySizes = shuffleStages.flatMap(_.shuffleStage.advisoryPartitionSize).toSet
-    if (advisorySizes.size == 1) {
-      advisorySizes.head
-    } else {
-      conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+    val defaultAdvisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
+    shuffleStages match {
+      case Seq(stage) =>
+        stage.shuffleStage.advisoryPartitionSize.getOrElse(defaultAdvisorySize)
+      case _ =>
+        defaultAdvisorySize
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index 97a4bd617e9..a27f783215e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -180,7 +180,7 @@ case class ShuffleQueryStageExec(
       throw new IllegalStateException(s"wrong plan for shuffle stage:\n ${plan.treeString}")
   }
 
-  @transient val advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize
+  def advisoryPartitionSize: Option[Long] = shuffle.advisoryPartitionSize
 
   @transient private lazy val shuffleFuture = shuffle.submitShuffleJob
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org