You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/08/03 04:03:39 UTC

[spark] branch branch-3.3 updated: [SPARK-39867][SQL] Global limit should not inherit OrderPreservingUnaryNode

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 2254240dba4 [SPARK-39867][SQL] Global limit should not inherit OrderPreservingUnaryNode
2254240dba4 is described below

commit 2254240dba4a71d9a68a22ca9a83080351fa3343
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Wed Aug 3 11:59:22 2022 +0800

    [SPARK-39867][SQL] Global limit should not inherit OrderPreservingUnaryNode
    
    Make GlobalLimit inherit UnaryNode rather than OrderPreservingUnaryNode
    
    Global limit can not promise the output ordering is same with child, it actually depend on the certain physical plan.
    
    For all physical plan with gobal limits:
    - CollectLimitExec: it does not promise output ordering
    - GlobalLimitExec: it required all tuples so it can assume the child is shuffle or child is single partition. Then it can use output ordering of child
    - TakeOrderedAndProjectExec: it do sort inside it's implementation
    
    This bug get worse since we pull out v1 write require ordering.
    
    yes, bug fix
    
    fix test and add test
    
    Closes #37284 from ulysses-you/sort.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit e9cc1024df4d587a0f456842d495db91984ed9db)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../catalyst/plans/logical/basicLogicalOperators.scala    |  7 ++++++-
 .../sql/catalyst/optimizer/EliminateSortsSuite.scala      | 15 ++++++++++-----
 2 files changed, 16 insertions(+), 6 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 774f6956162..e12a5918ee0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -1248,8 +1248,13 @@ object Limit {
  * A global (coordinated) limit. This operator can emit at most `limitExpr` number in total.
  *
  * See [[Limit]] for more information.
+ *
+ * Note that, we can not make it inherit [[OrderPreservingUnaryNode]] due to the different strategy
+ * of physical plan. The output ordering of child will be broken if a shuffle exchange comes in
+ * between the child and global limit, due to the fact that shuffle reader fetches blocks in random
+ * order.
  */
-case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends OrderPreservingUnaryNode {
+case class GlobalLimit(limitExpr: Expression, child: LogicalPlan) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
   override def maxRows: Option[Long] = {
     limitExpr match {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
index 7ceac3b3000..b97dc455dad 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSortsSuite.scala
@@ -115,9 +115,9 @@ class EliminateSortsSuite extends AnalysisTest {
 
   test("SPARK-33183: remove redundant sort by") {
     val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc_nullsFirst)
-    val unnecessaryReordered = orderedPlan.limit(2).select('a).sortBy('a.asc, 'b.desc_nullsFirst)
+    val unnecessaryReordered = LocalLimit(2, orderedPlan).select('a).sortBy('a.asc, 'b.desc_nullsFirst)
     val optimized = Optimize.execute(unnecessaryReordered.analyze)
-    val correctAnswer = orderedPlan.limit(2).select('a).analyze
+    val correctAnswer = LocalLimit(2, orderedPlan).select('a).analyze
     comparePlans(optimized, correctAnswer)
   }
 
@@ -161,11 +161,11 @@ class EliminateSortsSuite extends AnalysisTest {
     comparePlans(optimized, correctAnswer)
   }
 
-  test("SPARK-33183: limits should not affect order for local sort") {
+  test("SPARK-33183: local limits should not affect order for local sort") {
     val orderedPlan = testRelation.select('a, 'b).orderBy('a.asc, 'b.desc)
-    val filteredAndReordered = orderedPlan.limit(Literal(10)).sortBy('a.asc, 'b.desc)
+    val filteredAndReordered = LocalLimit(10, orderedPlan).sortBy('a.asc, 'b.desc)
     val optimized = Optimize.execute(filteredAndReordered.analyze)
-    val correctAnswer = orderedPlan.limit(Literal(10)).analyze
+    val correctAnswer = LocalLimit(10, orderedPlan).analyze
     comparePlans(optimized, correctAnswer)
   }
 
@@ -442,4 +442,9 @@ class EliminateSortsSuite extends AnalysisTest {
       .sortBy($"c".asc).analyze
     comparePlans(Optimize.execute(plan3), expected3)
   }
+
+  test("SPARK-39867: Global limit should not inherit OrderPreservingUnaryNode") {
+    val plan = testRelation.sortBy($"a".asc).limit(2).sortBy($"a".asc).analyze
+    comparePlans(Optimize.execute(plan), plan)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org