You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/10/27 20:29:24 UTC

[spark] branch branch-3.0 updated: [SPARK-33260][SQL] Fix incorrect results from SortExec when sortOrder is Stream

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new c95d925  [SPARK-33260][SQL] Fix incorrect results from SortExec when sortOrder is Stream
c95d925 is described below

commit c95d925a9add139e87c7b9f5c95571fdb034f724
Author: Ankur Dave <an...@gmail.com>
AuthorDate: Tue Oct 27 13:20:22 2020 -0700

    [SPARK-33260][SQL] Fix incorrect results from SortExec when sortOrder is Stream
    
    ### What changes were proposed in this pull request?
    
    The following query produces incorrect results. The query has two essential features: (1) it contains a string aggregate, resulting in a `SortExec` node, and (2) it contains a duplicate grouping key, causing `RemoveRepetitionFromGroupExpressions` to produce a sort order stored as a `Stream`.
    
    ```sql
    SELECT bigint_col_1, bigint_col_9, MAX(CAST(bigint_col_1 AS string))
    FROM table_4
    GROUP BY bigint_col_1, bigint_col_9, bigint_col_9
    ```
    
    When the sort order is stored as a `Stream`, the line `ordering.map(_.child.genCode(ctx))` in `GenerateOrdering#createOrderKeys()` produces unpredictable side effects to `ctx`. This is because `genCode(ctx)` modifies `ctx`. When ordering is a `Stream`, the modifications will not happen immediately as intended, but will instead occur lazily when the returned `Stream` is used later.
    
    Similar bugs have occurred at least three times in the past: https://issues.apache.org/jira/browse/SPARK-24500, https://issues.apache.org/jira/browse/SPARK-25767, https://issues.apache.org/jira/browse/SPARK-26680.
    
    The fix is to check if `ordering` is a `Stream` and force the modifications to happen immediately if so.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added a unit test for `SortExec` where `sortOrder` is a `Stream`. The test previously failed and now passes.
    
    Closes #30160 from ankurdave/SPARK-33260.
    
    Authored-by: Ankur Dave <an...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 3f2a2b5fe6ada37ef86f00737387e6cf2496df74)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/catalyst/expressions/codegen/GenerateOrdering.scala |  4 +++-
 .../scala/org/apache/spark/sql/execution/SortSuite.scala    | 13 +++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index 63bd59e..5d00519 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -71,7 +71,9 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], BaseOrdering] with
     ctx.INPUT_ROW = row
     // to use INPUT_ROW we must make sure currentVars is null
     ctx.currentVars = null
-    ordering.map(_.child.genCode(ctx))
+    // SPARK-33260: To avoid unpredictable modifications to `ctx` when `ordering` is a Stream, we
+    // use `toIndexedSeq` to make the transformation eager.
+    ordering.toIndexedSeq.map(_.child.genCode(ctx))
   }
 
   /**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 7654a9d..6a4f3f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -97,6 +97,19 @@ class SortSuite extends SparkPlanTest with SharedSparkSession {
     }
   }
 
+  test("SPARK-33260: sort order is a Stream") {
+    val input = Seq(
+      ("Hello", 4, 2.0),
+      ("Hello", 1, 1.0),
+      ("World", 8, 3.0)
+    )
+    checkAnswer(
+      input.toDF("a", "b", "c"),
+      (child: SparkPlan) => SortExec(Stream('a.asc, 'b.asc, 'c.asc), global = true, child = child),
+      input.sortBy(t => (t._1, t._2, t._3)).map(Row.fromTuple),
+      sortAnswers = false)
+  }
+
   // Test sorting on different data types
   for (
     dataType <- DataTypeTestUtils.atomicTypes ++ Set(NullType);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org