You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/09/15 04:22:53 UTC
[spark] branch master updated: [SPARK-45171][SQL] Initialize non-deterministic expressions in `GenerateExec`
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e097f916a27 [SPARK-45171][SQL] Initialize non-deterministic expressions in `GenerateExec`
e097f916a27 is described below
commit e097f916a2769dfe82bfd216fedcd6962e8280c8
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Fri Sep 15 13:22:40 2023 +0900
[SPARK-45171][SQL] Initialize non-deterministic expressions in `GenerateExec`
### What changes were proposed in this pull request?
Before evaluating the generator function in `GenerateExec`, initialize non-deterministic expressions.
### Why are the changes needed?
The following query fails:
```
select *
from explode(
transform(sequence(0, cast(rand()*1000 as int) + 1), x -> x * 22)
);
23/09/14 09:27:25 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
java.lang.IllegalArgumentException: requirement failed: Nondeterministic expression org.apache.spark.sql.catalyst.expressions.Rand should be initialized before eval.
at scala.Predef$.require(Predef.scala:281)
at org.apache.spark.sql.catalyst.expressions.Nondeterministic.eval(Expression.scala:497)
at org.apache.spark.sql.catalyst.expressions.Nondeterministic.eval$(Expression.scala:495)
at org.apache.spark.sql.catalyst.expressions.RDG.eval(randomExpressions.scala:35)
at org.apache.spark.sql.catalyst.expressions.BinaryArithmetic.eval(arithmetic.scala:384)
at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:543)
at org.apache.spark.sql.catalyst.expressions.BinaryArithmetic.eval(arithmetic.scala:384)
at org.apache.spark.sql.catalyst.expressions.Sequence.eval(collectionOperations.scala:3062)
at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval(higherOrderFunctions.scala:275)
at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval$(higherOrderFunctions.scala:274)
at org.apache.spark.sql.catalyst.expressions.ArrayTransform.eval(higherOrderFunctions.scala:308)
at org.apache.spark.sql.catalyst.expressions.ExplodeBase.eval(generators.scala:375)
at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$8(GenerateExec.scala:108)
...
```
However, this query succeeds:
```
select *
from explode(
sequence(0, cast(rand()*1000 as int) + 1)
);
0
1
2
3
...
801
802
803
```
The difference is that `transform` turns off whole-stage codegen, which exposes a bug in `GenerateExec` in which the non-deterministic expression passed to the generator function is not initialized before being used.
This PR fixes the bug in `GenerateExec`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New unit test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42933 from bersprockets/nondeterm_issue.
Lead-authored-by: Bruce Robbins <be...@gmail.com>
Co-authored-by: Hyukjin Kwon <gu...@gmail.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.../main/scala/org/apache/spark/sql/execution/GenerateExec.scala | 4 ++++
.../test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala | 7 +++++++
2 files changed, 11 insertions(+)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index f6dbf5fda18..b99361437e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -78,6 +78,10 @@ case class GenerateExec(
// boundGenerator.terminate() should be triggered after all of the rows in the partition
val numOutputRows = longMetric("numOutputRows")
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
+ boundGenerator.foreach {
+ case n: Nondeterministic => n.initialize(index)
+ case _ =>
+ }
val generatorNullRow = new GenericInternalRow(generator.elementSchema.length)
val rows = if (requiredChildOutput.nonEmpty) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index 24061b4c7c2..68f63feb5c5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -536,6 +536,13 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
checkAnswer(df,
Row(1, 1) :: Row(1, 2) :: Row(2, 2) :: Row(2, 3) :: Row(3, null) :: Nil)
}
+
+ test("SPARK-45171: Handle evaluated nondeterministic expression") {
+ withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+ val df = sql("select explode(array(rand(0)))")
+ checkAnswer(df, Row(0.7604953758285915d))
+ }
+ }
}
case class EmptyGenerator() extends Generator with LeafLike[Expression] {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org