You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/03/27 00:32:12 UTC
[spark] branch branch-3.3 updated: [SPARK-38308][SQL] Eagerly iterate over sequence of window expressions in `ExtractWindowExpressions`
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new 7fcdd71 [SPARK-38308][SQL] Eagerly iterate over sequence of window expressions in `ExtractWindowExpressions`
7fcdd71 is described below
commit 7fcdd71fa976d910425143c9e733385c7cba37ca
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Sun Mar 27 09:29:49 2022 +0900
[SPARK-38308][SQL] Eagerly iterate over sequence of window expressions in `ExtractWindowExpressions`
### What changes were proposed in this pull request?
Pass an `IndexedSeq` (likely a `Vector`) to `ExtractWindowExpressions.extract` and `ExtractWindowExpressions.addWindow` rather than whatever sequence type was specified by the user (in the `Dataset.select` method).
To accomplish this, we only need to pass an `IndexedSeq` to `ExtractWindowExpressions.extract`. `ExtractWindowExpressions.extract` will then return another `IndexedSeq` that we will pass on to `ExtractWindowExpressions.addWindow`
### Why are the changes needed?
Consider this query:
```
val df = spark.range(0, 20).map { x =>
(x % 4, x + 1, x + 2)
}.toDF("a", "b", "c")
import org.apache.spark.sql.expressions._
val w = Window.partitionBy("a").orderBy("b")
val selectExprs = Stream(
sum("c").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).as("sumc"),
avg("c").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).as("avgc")
)
df.select(selectExprs: _*).show(false)
```
It fails with
```
org.apache.spark.sql.AnalysisException: Resolved attribute(s) avgc#23 missing from c#16L,a#14L,b#15L,sumc#21L in operator !Project [c#16L, a#14L, b#15L, sumc#21L, sumc#21L, avgc#23].;
```
If you change `Stream` to a `Seq`, it succeeds.
As with SPARK-38221 and SPARK-38528, this is due to the use of this code pattern:
```
def someMethod (seq: Seq[xxx]) {
...
val outerDataStructure = <create outer data structure>
val newSeq = seq.map { x =>
...
code that puts something in outerDataStructure
...
}
...
code that uses outerDataStructure (and expects it to be populated)
...
}
```
If `seq` is a `Stream`, `seq.map` might be evaluated lazily, in which case `outerDataStructure` will not be fully populated before it is used.
Both `ExtractWindowExpressions.extract` and `ExtractWindowExpressions.addWindow` use this pattern, but the above example failure is due to the pattern's use in `ExtractWindowExpressions.addWindow` (`extractedWindowExprBuffer` does not get fully populated, so the Window operator does not produce the output expected by its parent projection).
I chose `IndexedSeq` not for its efficient indexing, but because `map` will eagerly iterate over it.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit test.
Closes #35635 from bersprockets/window_expression_stream_issue.
Authored-by: Bruce Robbins <be...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit eb30a27e53158e64fffaa6d32ff9369ffbae0384)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 2 +-
.../spark/sql/DataFrameWindowFunctionsSuite.scala | 24 ++++++++++++++++++++++
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 5289983..6d95067 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3141,7 +3141,7 @@ class Analyzer(override val catalogManager: CatalogManager)
// have been resolved.
case p @ Project(projectList, child)
if hasWindowFunction(projectList) && !p.expressions.exists(!_.resolved) =>
- val (windowExpressions, regularExpressions) = extract(projectList)
+ val (windowExpressions, regularExpressions) = extract(projectList.toIndexedSeq)
// We add a project to get all needed expressions for window expressions from the child
// of the original Project operator.
val withProject = Project(regularExpressions, child)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 11b2309..4676f8b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -1122,4 +1122,28 @@ class DataFrameWindowFunctionsSuite extends QueryTest
assert(shuffleByRequirement, "Can't find desired shuffle node from the query plan")
}
}
+
+ test("SPARK-38308: Properly handle Stream of window expressions") {
+ val df = Seq(
+ (1, 2, 3),
+ (1, 3, 4),
+ (2, 4, 5),
+ (2, 5, 6)
+ ).toDF("a", "b", "c")
+
+ val w = Window.partitionBy("a").orderBy("b")
+ val selectExprs = Stream(
+ sum("c").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).as("sumc"),
+ avg("c").over(w.rowsBetween(Window.unboundedPreceding, Window.currentRow)).as("avgc")
+ )
+ checkAnswer(
+ df.select(selectExprs: _*),
+ Seq(
+ Row(3, 3),
+ Row(7, 3.5),
+ Row(5, 5),
+ Row(11, 5.5)
+ )
+ )
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org