You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/25 10:36:07 UTC

[spark] branch branch-3.3 updated: [SPARK-39835][SQL] Fix EliminateSorts remove global sort below the local sort

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 7603f8d0aeb [SPARK-39835][SQL] Fix EliminateSorts remove global sort below the local sort
7603f8d0aeb is described below

commit 7603f8d0aeb72e1989643fc9911edca0744087ad
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Mon Jul 25 18:35:03 2022 +0800

    [SPARK-39835][SQL] Fix EliminateSorts remove global sort below the local sort
    
    ### What changes were proposed in this pull request?
    
     Correct the `EliminateSorts` follows:
    
    - If the upper sort is global then we can remove the global or local sort recursively.
    - If the upper sort is local then we can only remove the local sort recursively.
    
    ### Why are the changes needed?
    
    If a global sort below locol sort, we should not remove the global sort becuase the output partitioning can be affected.
    
    This issue is going to worse since we pull out the V1 Write sort to logcial side.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, bug fix
    
    ### How was this patch tested?
    
    add test
    
    Closes #37250 from ulysses-you/remove-sort.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 5dca26d514a150bda58f7c4919624c9638498fec)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 26 +++++++++++++++++-----
 .../catalyst/optimizer/EliminateSortsSuite.scala   | 16 +++++++++++++
 2 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d3a6065f726..827df04443e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1445,21 +1445,31 @@ object EliminateSorts extends Rule[LogicalPlan] {
       }
     case Sort(orders, false, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
       applyLocally.lift(child).getOrElse(child)
-    case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
+    case s @ Sort(_, global, child) => s.copy(child = recursiveRemoveSort(child, global))
     case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) =>
-      j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight))
+      j.copy(left = recursiveRemoveSort(originLeft, true),
+        right = recursiveRemoveSort(originRight, true))
     case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) =>
-      g.copy(child = recursiveRemoveSort(originChild))
+      g.copy(child = recursiveRemoveSort(originChild, true))
   }
 
-  private def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = {
+  /**
+   * If the upper sort is global then we can remove the global or local sort recursively.
+   * If the upper sort is local then we can only remove the local sort recursively.
+   */
+  private def recursiveRemoveSort(
+      plan: LogicalPlan,
+      canRemoveGlobalSort: Boolean): LogicalPlan = {
     if (!plan.containsPattern(SORT)) {
       return plan
     }
     plan match {
-      case Sort(_, _, child) => recursiveRemoveSort(child)
+      case Sort(_, global, child) if canRemoveGlobalSort || !global =>
+        recursiveRemoveSort(child, canRemoveGlobalSort)
       case other if canEliminateSort(other) =>
-        other.withNewChildren(other.children.map(recursiveRemoveSort))
+        other.withNewChildren(other.children.map(c => recursiveRemoveSort(c, canRemoveGlobalSort)))
+      case other if canEliminateGlobalSort(other) =>
+        other.withNewChildren(other.children.map(c => recursiveRemoveSort(c, true)))
       case _ => plan
     }
   }
@@ -1467,6 +1477,10 @@ object EliminateSorts extends Rule[LogicalPlan] {
   private def canEliminateSort(plan: LogicalPlan): Boolean = plan match {
     case p: Project => p.projectList.forall(_.deterministic)
     case f: Filter => f.condition.deterministic
+    case _ => false
+  }
+
+  private def canEliminateGlobalSort(plan: LogicalPlan): Boolean = plan match {
     case r: RepartitionByExpression => r.partitionExpressions.forall(_.deterministic)
     case r: RebalancePartitions => r.partitionExpressions.forall(_.deterministic)
     case _: Repartition => true
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index 01ecbd808c2..053bc1c2137 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -422,4 +422,20 @@ class EliminateSortsSuite extends AnalysisTest {
       comparePlans(optimized, correctAnswer)
     }
   }
+
+  test("SPARK-39835: Fix EliminateSorts remove global sort below the local sort") {
+    // global -> local
+    val plan = testRelation.orderBy($"a".asc).sortBy($"c".asc).analyze
+    comparePlans(Optimize.execute(plan), plan)
+
+    // global -> global -> local
+    val plan2 = testRelation.orderBy($"a".asc).orderBy($"b".asc).sortBy($"c".asc).analyze
+    val expected2 = testRelation.orderBy($"b".asc).sortBy($"c".asc).analyze
+    comparePlans(Optimize.execute(plan2), expected2)
+
+    // local -> global -> local
+    val plan3 = testRelation.sortBy($"a".asc).orderBy($"b".asc).sortBy($"c".asc).analyze
+    val expected3 = testRelation.orderBy($"b".asc).sortBy($"c".asc).analyze
+    comparePlans(Optimize.execute(plan3), expected3)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org