You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/25 10:36:00 UTC

[spark] branch master updated: [SPARK-39835][SQL] Fix EliminateSorts remove global sort below the local sort

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dca26d514a [SPARK-39835][SQL] Fix EliminateSorts remove global sort below the local sort
5dca26d514a is described below

commit 5dca26d514a150bda58f7c4919624c9638498fec
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Mon Jul 25 18:35:03 2022 +0800

    [SPARK-39835][SQL] Fix EliminateSorts remove global sort below the local sort
    
    ### What changes were proposed in this pull request?
    
     Correct the `EliminateSorts` follows:
    
    - If the upper sort is global then we can remove the global or local sort recursively.
    - If the upper sort is local then we can only remove the local sort recursively.
    
    ### Why are the changes needed?
    
    If a global sort below locol sort, we should not remove the global sort becuase the output partitioning can be affected.
    
    This issue is going to worse since we pull out the V1 Write sort to logcial side.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, bug fix
    
    ### How was this patch tested?
    
    add test
    
    Closes #37250 from ulysses-you/remove-sort.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 26 +++++++++++++++++-----
 .../catalyst/optimizer/EliminateSortsSuite.scala   | 16 +++++++++++++
 2 files changed, 36 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 02a64a22ed4..8bbec13e9e3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1528,21 +1528,31 @@ object EliminateSorts extends Rule[LogicalPlan] {
       }
     case Sort(orders, false, child) if SortOrder.orderingSatisfies(child.outputOrdering, orders) =>
       applyLocally.lift(child).getOrElse(child)
-    case s @ Sort(_, _, child) => s.copy(child = recursiveRemoveSort(child))
+    case s @ Sort(_, global, child) => s.copy(child = recursiveRemoveSort(child, global))
     case j @ Join(originLeft, originRight, _, cond, _) if cond.forall(_.deterministic) =>
-      j.copy(left = recursiveRemoveSort(originLeft), right = recursiveRemoveSort(originRight))
+      j.copy(left = recursiveRemoveSort(originLeft, true),
+        right = recursiveRemoveSort(originRight, true))
     case g @ Aggregate(_, aggs, originChild) if isOrderIrrelevantAggs(aggs) =>
-      g.copy(child = recursiveRemoveSort(originChild))
+      g.copy(child = recursiveRemoveSort(originChild, true))
   }
 
-  private def recursiveRemoveSort(plan: LogicalPlan): LogicalPlan = {
+  /**
+   * If the upper sort is global then we can remove the global or local sort recursively.
+   * If the upper sort is local then we can only remove the local sort recursively.
+   */
+  private def recursiveRemoveSort(
+      plan: LogicalPlan,
+      canRemoveGlobalSort: Boolean): LogicalPlan = {
     if (!plan.containsPattern(SORT)) {
       return plan
     }
     plan match {
-      case Sort(_, _, child) => recursiveRemoveSort(child)
+      case Sort(_, global, child) if canRemoveGlobalSort || !global =>
+        recursiveRemoveSort(child, canRemoveGlobalSort)
       case other if canEliminateSort(other) =>
-        other.withNewChildren(other.children.map(recursiveRemoveSort))
+        other.withNewChildren(other.children.map(c => recursiveRemoveSort(c, canRemoveGlobalSort)))
+      case other if canEliminateGlobalSort(other) =>
+        other.withNewChildren(other.children.map(c => recursiveRemoveSort(c, true)))
       case _ => plan
     }
   }
@@ -1550,6 +1560,10 @@ object EliminateSorts extends Rule[LogicalPlan] {
   private def canEliminateSort(plan: LogicalPlan): Boolean = plan match {
     case p: Project => p.projectList.forall(_.deterministic)
     case f: Filter => f.condition.deterministic
+    case _ => false
+  }
+
+  private def canEliminateGlobalSort(plan: LogicalPlan): Boolean = plan match {
     case r: RepartitionByExpression => r.partitionExpressions.forall(_.deterministic)
     case r: RebalancePartitions => r.partitionExpressions.forall(_.deterministic)
     case _: Repartition => true
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index 865a06368a4..1d879a7065e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -424,4 +424,20 @@ class EliminateSortsSuite extends AnalysisTest {
       comparePlans(optimized, correctAnswer)
     }
   }
+
+  test("SPARK-39835: Fix EliminateSorts remove global sort below the local sort") {
+    // global -> local
+    val plan = testRelation.orderBy($"a".asc).sortBy($"c".asc).analyze
+    comparePlans(Optimize.execute(plan), plan)
+
+    // global -> global -> local
+    val plan2 = testRelation.orderBy($"a".asc).orderBy($"b".asc).sortBy($"c".asc).analyze
+    val expected2 = testRelation.orderBy($"b".asc).sortBy($"c".asc).analyze
+    comparePlans(Optimize.execute(plan2), expected2)
+
+    // local -> global -> local
+    val plan3 = testRelation.sortBy($"a".asc).orderBy($"b".asc).sortBy($"c".asc).analyze
+    val expected3 = testRelation.orderBy($"b".asc).sortBy($"c".asc).analyze
+    comparePlans(Optimize.execute(plan3), expected3)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org