You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/10/27 20:29:24 UTC
[spark] branch branch-3.0 updated: [SPARK-33260][SQL] Fix incorrect
results from SortExec when sortOrder is Stream
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c95d925 [SPARK-33260][SQL] Fix incorrect results from SortExec when sortOrder is Stream
c95d925 is described below
commit c95d925a9add139e87c7b9f5c95571fdb034f724
Author: Ankur Dave <an...@gmail.com>
AuthorDate: Tue Oct 27 13:20:22 2020 -0700
[SPARK-33260][SQL] Fix incorrect results from SortExec when sortOrder is Stream
### What changes were proposed in this pull request?
The following query produces incorrect results. The query has two essential features: (1) it contains a string aggregate, resulting in a `SortExec` node, and (2) it contains a duplicate grouping key, causing `RemoveRepetitionFromGroupExpressions` to produce a sort order stored as a `Stream`.
```sql
SELECT bigint_col_1, bigint_col_9, MAX(CAST(bigint_col_1 AS string))
FROM table_4
GROUP BY bigint_col_1, bigint_col_9, bigint_col_9
```
When the sort order is stored as a `Stream`, the line `ordering.map(_.child.genCode(ctx))` in `GenerateOrdering#createOrderKeys()` produces unpredictable side effects to `ctx`. This is because `genCode(ctx)` modifies `ctx`. When ordering is a `Stream`, the modifications will not happen immediately as intended, but will instead occur lazily when the returned `Stream` is used later.
Similar bugs have occurred at least three times in the past: https://issues.apache.org/jira/browse/SPARK-24500, https://issues.apache.org/jira/browse/SPARK-25767, https://issues.apache.org/jira/browse/SPARK-26680.
The fix is to check if `ordering` is a `Stream` and force the modifications to happen immediately if so.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a unit test for `SortExec` where `sortOrder` is a `Stream`. The test previously failed and now passes.
Closes #30160 from ankurdave/SPARK-33260.
Authored-by: Ankur Dave <an...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
(cherry picked from commit 3f2a2b5fe6ada37ef86f00737387e6cf2496df74)
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../sql/catalyst/expressions/codegen/GenerateOrdering.scala | 4 +++-
.../scala/org/apache/spark/sql/execution/SortSuite.scala | 13 +++++++++++++
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index 63bd59e..5d00519 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -71,7 +71,9 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], BaseOrdering] with
ctx.INPUT_ROW = row
// to use INPUT_ROW we must make sure currentVars is null
ctx.currentVars = null
- ordering.map(_.child.genCode(ctx))
+ // SPARK-33260: To avoid unpredictable modifications to `ctx` when `ordering` is a Stream, we
+ // use `toIndexedSeq` to make the transformation eager.
+ ordering.toIndexedSeq.map(_.child.genCode(ctx))
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 7654a9d..6a4f3f6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -97,6 +97,19 @@ class SortSuite extends SparkPlanTest with SharedSparkSession {
}
}
+ test("SPARK-33260: sort order is a Stream") {
+ val input = Seq(
+ ("Hello", 4, 2.0),
+ ("Hello", 1, 1.0),
+ ("World", 8, 3.0)
+ )
+ checkAnswer(
+ input.toDF("a", "b", "c"),
+ (child: SparkPlan) => SortExec(Stream('a.asc, 'b.asc, 'c.asc), global = true, child = child),
+ input.sortBy(t => (t._1, t._2, t._3)).map(Row.fromTuple),
+ sortAnswers = false)
+ }
+
// Test sorting on different data types
for (
dataType <- DataTypeTestUtils.atomicTypes ++ Set(NullType);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org