You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/11/09 08:02:47 UTC

[spark] branch branch-3.2 updated: [SPARK-40588] FileFormatWriter materializes AQE plan before accessing outputOrdering

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f0f83b518d0 [SPARK-40588] FileFormatWriter materializes AQE plan before accessing outputOrdering
f0f83b518d0 is described below

commit f0f83b518d0bbfccd4ff6414d372bc2ad236f0ff
Author: Enrico Minack <gi...@enrico.minack.dev>
AuthorDate: Wed Nov 9 15:59:54 2022 +0800

    [SPARK-40588] FileFormatWriter materializes AQE plan before accessing outputOrdering
    
    The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing the plan's `outputOrdering`. This is required for Spark 3.0 to 3.3. Spark 3.4 does not need this because `FileFormatWriter` gets the final plan.
    
    `FileFormatWriter` enforces an ordering if the written plan does not provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering (Spark 3.0 to 3.3), in which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see SPARK-40588).
    
    This fixes SPARK-40588, which was introduced in 3.0. This restores behaviour from Spark 2.4.
    
    The final plan that is written to files cannot be extracted from `FileFormatWriter`. The bug explained in [SPARK-40588](https://issues.apache.org/jira/browse/SPARK-40588) can only be asserted on the result files when spilling occurs. This is very hard to control in an unit test scenario.
    
    Therefore, this was tested manually. The [example to reproduce this issue](https://issues.apache.org/jira/browse/SPARK-40588?focusedCommentId=17621032&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17621032) given in SPARK-40588 now produces sorted files.
    
    The actual plan written into the files changed from
    
    ```
    Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0
    +- AdaptiveSparkPlan isFinalPlan=false
       +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
          +- Exchange hashpartitioning(day#2L, 2), REPARTITION_BY_NUM, [id=#30]
             +- BroadcastNestedLoopJoin BuildLeft, Inner
                :- BroadcastExchange IdentityBroadcastMode, [id=#28]
                :  +- Project [id#0L AS day#2L]
                :     +- Range (0, 2, step=1, splits=2)
                +- Range (0, 10000000, step=1, splits=2)
    ```
    
    where `FileFormatWriter` enforces order with `Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0`, to
    
    ```
    *(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0
    +- AQEShuffleRead coalesced
       +- ShuffleQueryStage 1
          +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [id=#68]
             +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner
                :- BroadcastQueryStage 0
                :  +- BroadcastExchange IdentityBroadcastMode, [id=#42]
                :     +- *(1) Project [id#0L AS day#2L]
                :        +- *(1) Range (0, 2, step=1, splits=2)
                +- *(2) Range (0, 1000000, step=1, splits=2)
    ```
    
    where the sort given by the user is the outermost sort now.
    
    Closes #38358 from EnricoMi/branch-3.3-materialize-aqe-plan.
    
    Authored-by: Enrico Minack <gi...@enrico.minack.dev>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit f0cad7ad6c2618d2d0d8c8598bbd54c2ca366b6b)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/adaptive/AdaptiveSparkPlanExec.scala |  2 ++
 .../sql/execution/datasources/FileFormatWriter.scala   | 18 ++++++++++++++----
 2 files changed, 16 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 5157c169ef9..1e7cfc474c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -217,6 +217,8 @@ case class AdaptiveSparkPlanExec(
       .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
   }
 
+  def finalPhysicalPlan: SparkPlan = withFinalPlanUpdate(identity)
+
   private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
     if (isFinalPlan) return currentPhysicalPlan
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index cd3d101ac26..a9d4d4208f3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, SQLExecution, UnsafeExternalRowSorter}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StringType
 import org.apache.spark.unsafe.types.UTF8String
@@ -160,8 +161,17 @@ object FileFormatWriter extends Logging {
 
     // We should first sort by partition columns, then bucket id, and finally sorting columns.
     val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns
+
+    // SPARK-40588: plan may contain an AdaptiveSparkPlanExec, which does not know
+    // its final plan's ordering, so we have to materialize that plan first
+    def materializeAdaptiveSparkPlan(plan: SparkPlan): SparkPlan = plan match {
+      case a: AdaptiveSparkPlanExec => a.finalPhysicalPlan
+      case p: SparkPlan => p.withNewChildren(p.children.map(materializeAdaptiveSparkPlan))
+    }
+    val materializedPlan = materializeAdaptiveSparkPlan(empty2NullPlan)
+
     // the sort order doesn't matter
-    val actualOrdering = empty2NullPlan.outputOrdering.map(_.child)
+    val actualOrdering = materializedPlan.outputOrdering.map(_.child)
     val orderingMatched = if (requiredOrdering.length > actualOrdering.length) {
       false
     } else {
@@ -183,7 +193,7 @@ object FileFormatWriter extends Logging {
 
     try {
       val (rdd, concurrentOutputWriterSpec) = if (orderingMatched) {
-        (empty2NullPlan.execute(), None)
+        (materializedPlan.execute(), None)
       } else {
         // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
         // the physical plan may have different attribute ids due to optimizer removing some
@@ -193,12 +203,12 @@ object FileFormatWriter extends Logging {
         val sortPlan = SortExec(
           orderingExpr,
           global = false,
-          child = empty2NullPlan)
+          child = materializedPlan)
 
         val maxWriters = sparkSession.sessionState.conf.maxConcurrentOutputFileWriters
         val concurrentWritersEnabled = maxWriters > 0 && sortColumns.isEmpty
         if (concurrentWritersEnabled) {
-          (empty2NullPlan.execute(),
+          (materializedPlan.execute(),
             Some(ConcurrentOutputWriterSpec(maxWriters, () => sortPlan.createSorter())))
         } else {
           (sortPlan.execute(), None)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org