You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/03/27 00:35:14 UTC
[spark] branch branch-3.1 updated: [SPARK-38528][SQL][3.2] Eagerly iterate over aggregate sequence when building project list in `ExtractGenerator`
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new b3e3110 [SPARK-38528][SQL][3.2] Eagerly iterate over aggregate sequence when building project list in `ExtractGenerator`
b3e3110 is described below
commit b3e31107277cea5e441eb3708535e740712027a6
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Sun Mar 27 09:31:49 2022 +0900
[SPARK-38528][SQL][3.2] Eagerly iterate over aggregate sequence when building project list in `ExtractGenerator`
Backport of #35837.
When building the project list from an aggregate sequence in `ExtractGenerator`, convert the aggregate sequence to an `IndexedSeq` before performing the flatMap operation.
This query fails with a `NullPointerException`:
```
val df = Seq(1, 2, 3).toDF("v")
df.select(Stream(explode(array(min($"v"), max($"v"))), sum($"v")): _*).collect
```
If you change `Stream` to `Seq`, then it succeeds.
`ExtractGenerator` uses a flatMap operation over `aggList` for two purposes:
- To produce a new aggregate list
- to update `projectExprs` (which is initialized as an array of nulls).
When `aggList` is a `Stream`, the flatMap operation evaluates lazily, so all entries in `projectExprs` after the first will still be null when the rule completes.
Changing `aggList` to an `IndexedSeq` forces the flatMap to evaluate eagerly.
No
New unit test
Closes #35851 from bersprockets/generator_aggregate_issue_32.
Authored-by: Bruce Robbins <be...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit 7842621ff50001e1cde8e2e6a2fc48c2cdcaf3d4)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 1 +
.../test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala | 7 +++++++
2 files changed, 8 insertions(+)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ae5e2d1..aacc801 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -2649,6 +2649,7 @@ class Analyzer(override val catalogManager: CatalogManager)
val projectExprs = Array.ofDim[NamedExpression](aggList.length)
val newAggList = aggList
+ .toIndexedSeq
.map(trimNonTopLevelAliases)
.zipWithIndex
.flatMap {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index 8f44903..5ac2383 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -356,6 +356,13 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
val df = Seq(1, 2, 3).toDF("v")
checkAnswer(df.select(explode(array(min($"v"), max($"v")))), Row(1) :: Row(3) :: Nil)
}
+
+ test("SPARK-38528: generator in stream of aggregate expressions") {
+ val df = Seq(1, 2, 3).toDF("v")
+ checkAnswer(
+ df.select(Stream(explode(array(min($"v"), max($"v"))), sum($"v")): _*),
+ Row(1, 6) :: Row(3, 6) :: Nil)
+ }
}
case class EmptyGenerator() extends Generator {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org