You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/09/15 04:22:53 UTC

[spark] branch master updated: [SPARK-45171][SQL] Initialize non-deterministic expressions in `GenerateExec`

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e097f916a27 [SPARK-45171][SQL] Initialize non-deterministic expressions in `GenerateExec`
e097f916a27 is described below

commit e097f916a2769dfe82bfd216fedcd6962e8280c8
Author: Bruce Robbins <be...@gmail.com>
AuthorDate: Fri Sep 15 13:22:40 2023 +0900

    [SPARK-45171][SQL] Initialize non-deterministic expressions in `GenerateExec`
    
    ### What changes were proposed in this pull request?
    
    Before evaluating the generator function in `GenerateExec`, initialize non-deterministic expressions.
    
    ### Why are the changes needed?
    
    The following query fails:
    ```
    select *
    from explode(
      transform(sequence(0, cast(rand()*1000 as int) + 1), x -> x * 22)
    );
    
    23/09/14 09:27:25 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3)
    java.lang.IllegalArgumentException: requirement failed: Nondeterministic expression org.apache.spark.sql.catalyst.expressions.Rand should be initialized before eval.
            at scala.Predef$.require(Predef.scala:281)
            at org.apache.spark.sql.catalyst.expressions.Nondeterministic.eval(Expression.scala:497)
            at org.apache.spark.sql.catalyst.expressions.Nondeterministic.eval$(Expression.scala:495)
            at org.apache.spark.sql.catalyst.expressions.RDG.eval(randomExpressions.scala:35)
            at org.apache.spark.sql.catalyst.expressions.BinaryArithmetic.eval(arithmetic.scala:384)
            at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:543)
            at org.apache.spark.sql.catalyst.expressions.BinaryArithmetic.eval(arithmetic.scala:384)
            at org.apache.spark.sql.catalyst.expressions.Sequence.eval(collectionOperations.scala:3062)
            at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval(higherOrderFunctions.scala:275)
            at org.apache.spark.sql.catalyst.expressions.SimpleHigherOrderFunction.eval$(higherOrderFunctions.scala:274)
            at org.apache.spark.sql.catalyst.expressions.ArrayTransform.eval(higherOrderFunctions.scala:308)
            at org.apache.spark.sql.catalyst.expressions.ExplodeBase.eval(generators.scala:375)
            at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$8(GenerateExec.scala:108)
    ...
    ```
    However, this query succeeds:
    ```
    select *
    from explode(
      sequence(0, cast(rand()*1000 as int) + 1)
    );
    
    0
    1
    2
    3
    ...
    801
    802
    803
    ```
    The difference is that `transform` turns off whole-stage codegen, which exposes a bug in `GenerateExec` in which the non-deterministic expression passed to the generator function is not initialized before being used.
    
    This PR fixes the bug in `GenerateExec`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42933 from bersprockets/nondeterm_issue.
    
    Lead-authored-by: Bruce Robbins <be...@gmail.com>
    Co-authored-by: Hyukjin Kwon <gu...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../main/scala/org/apache/spark/sql/execution/GenerateExec.scala   | 4 ++++
 .../test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala   | 7 +++++++
 2 files changed, 11 insertions(+)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index f6dbf5fda18..b99361437e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -78,6 +78,10 @@ case class GenerateExec(
     // boundGenerator.terminate() should be triggered after all of the rows in the partition
     val numOutputRows = longMetric("numOutputRows")
     child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
+      boundGenerator.foreach {
+        case n: Nondeterministic => n.initialize(index)
+        case _ =>
+      }
       val generatorNullRow = new GenericInternalRow(generator.elementSchema.length)
       val rows = if (requiredChildOutput.nonEmpty) {
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index 24061b4c7c2..68f63feb5c5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -536,6 +536,13 @@ class GeneratorFunctionSuite extends QueryTest with SharedSparkSession {
     checkAnswer(df,
       Row(1, 1) :: Row(1, 2) :: Row(2, 2) :: Row(2, 3) :: Row(3, null) :: Nil)
   }
+
+  test("SPARK-45171: Handle evaluated nondeterministic expression") {
+    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false") {
+      val df = sql("select explode(array(rand(0)))")
+      checkAnswer(df, Row(0.7604953758285915d))
+    }
+  }
 }
 
 case class EmptyGenerator() extends Generator with LeafLike[Expression] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org