You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/05 10:27:13 UTC

[spark] branch branch-3.1 updated: [SPARK-39867][SQL][3.1] Global limit should not inherit OrderPreservingUnaryNode

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new a25394ad775 [SPARK-39867][SQL][3.1] Global limit should not inherit OrderPreservingUnaryNode
a25394ad775 is described below

commit a25394ad775e132f96754f2cb84be08c8b7014a6
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Fri Aug 5 18:26:57 2022 +0800

    [SPARK-39867][SQL][3.1] Global limit should not inherit OrderPreservingUnaryNode
    
    backport branch-3.1 for https://github.com/apache/spark/pull/37284
    
    ### What changes were proposed in this pull request?
    
    Make GlobalLimit inherit UnaryNode rather than OrderPreservingUnaryNode
    
    ### Why are the changes needed?
    
    Global limit can not promise the output ordering is same with child, it actually depend on the certain physical plan.
    
    For all physical plan with gobal limits:
    - CollectLimitExec: it does not promise output ordering
    - GlobalLimitExec: it required all tuples so it can assume the child is shuffle or child is single partition. Then it can use output ordering of child
    - TakeOrderedAndProjectExec: it do sort inside it's implementation
    
    This bug get worse since we pull out v1 write require ordering.
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, bug fix
    
    ### How was this patch tested?
    
    fix test and add test
    
    Closes #37398 from ulysses-you/SPARK-39867-3.1.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../catalyst/plans/logical/basicLogicalOperators.scala   |  7 ++++++-
 .../sql/catalyst/optimizer/EliminateSortsSuite.scala     | 16 +++++++++++-----
 2 files changed, 17 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 1d8d4e1952b..58dcbfdad5d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -903,8 +903,13 @@ object Limit {
  * A global (coordinated) limit. This operator can emit at most `limitExpr` number in total.
  *
  * See [[Limit]] for more information.
+ *
+ * Note that, we can not make it inherit [[OrderPreservingUnaryNode]] due to the different strategy
+ * of physical plan. The output ordering of child will be broken if a shuffle exchange comes in
+ * between the child and global limit, due to the fact that shuffle reader fetches blocks in random
+ * order.
  */
-case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
+case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
   override def maxRows: Option[Long] = {
     limitExpr match {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index ca7d3864809..d8a258e4208 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -108,9 +108,10 @@ class EliminateSortsSuite extends AnalysisTest {
 
   test("SPARK-33183: remove redundant sort by") {
     val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
-    val unnecessaryReordered = orderedPlan.limit(2).select('a).sortBy('a.asc, 'b.desc_nullsFirst)
+    val unnecessaryReordered = LocalLimit(2, orderedPlan).select('a)
+      .sortBy('a.asc, 'b.desc_nullsFirst)
     val optimized = Optimize.execute(unnecessaryReordered.analyze)
-    val correctAnswer = orderedPlan.limit(2).select('a).analyze
+    val correctAnswer = LocalLimit(2, orderedPlan).select('a).analyze
     comparePlans(optimized, correctAnswer)
   }
 
@@ -154,11 +155,11 @@ class EliminateSortsSuite extends AnalysisTest {
     comparePlans(optimized, correctAnswer)
   }
 
-  test("SPARK-33183: limits should not affect order for local sort") {
+  test("SPARK-33183: local limits should not affect order for local sort") {
     val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
-    val filteredAndReordered = orderedPlan.limit(Literal(10)).sortBy('a.asc, 'b.desc)
+    val filteredAndReordered = LocalLimit(10, orderedPlan).sortBy('a.asc, 'b.desc)
     val optimized = Optimize.execute(filteredAndReordered.analyze)
-    val correctAnswer = orderedPlan.limit(Literal(10)).analyze
+    val correctAnswer = LocalLimit(10, orderedPlan).analyze
     comparePlans(optimized, correctAnswer)
   }
 
@@ -435,4 +436,9 @@ class EliminateSortsSuite extends AnalysisTest {
       .sortBy($"c".asc).analyze
     comparePlans(Optimize.execute(plan3), expected3)
   }
+
+  test("SPARK-39867: Global limit should not inherit OrderPreservingUnaryNode") {
+    val plan = testRelation.sortBy($"a".asc).limit(2).sortBy($"a".asc).analyze
+    comparePlans(Optimize.execute(plan), plan)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org