You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/18 03:25:41 UTC

[spark] branch branch-3.1 updated: [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 07cc6a8963d [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF
07cc6a8963d is described below

commit 07cc6a8963d9bd26d5ec0738ca4fa4767cbfac63
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Thu Aug 18 12:23:02 2022 +0900

    [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF
    
    This PR proposes to initialize the projection so non-deterministic expressions can be evaluated with Python UDFs.
    
    To make the Python UDF working with non-deterministic expressions.
    
    Yes.
    
    ```python
    from pyspark.sql.functions import udf, rand
    spark.range(10).select(udf(lambda x: x, "double")(rand())).show()
    ```
    
    **Before**
    
    ```
    java.lang.NullPointerException
            at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source)
            at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126)
            at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
            at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
            at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
            at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
            at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
    ```
    
    **After**
    
    ```
    +----------------------------------+
    |<lambda>rand(-2507211707257730645)|
    +----------------------------------+
    |                0.7691724424045242|
    |               0.09602244075319044|
    |                0.3006471278112862|
    |                0.4182649571961977|
    |               0.29349096650900974|
    |                0.7987097908937618|
    |                0.5324802583101007|
    |                  0.72460930912789|
    |                0.1367749768412846|
    |               0.17277322931919348|
    +----------------------------------+
    ```
    
    Manually tested, and unittest was added.
    
    Closes #37552 from HyukjinKwon/SPARK-40121.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit 336c9bc535895530cc3983b24e7507229fa9570d)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/sql/tests/test_udf.py                              | 8 +++++++-
 .../org/apache/spark/sql/execution/python/EvalPythonExec.scala    | 1 +
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py
index 0d13361dcab..47d5efd441f 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -23,7 +23,7 @@ import unittest
 
 from pyspark import SparkContext
 from pyspark.sql import SparkSession, Column, Row
-from pyspark.sql.functions import udf
+from pyspark.sql.functions import udf, rand
 from pyspark.sql.udf import UserDefinedFunction
 from pyspark.sql.types import StringType, IntegerType, BooleanType, DoubleType, LongType, \
     ArrayType, StructType, StructField
@@ -685,6 +685,12 @@ class UDFTests(ReusedSQLTestCase):
         self.assertEqual(result.collect(),
                          [Row(c1=Row(_1=1.0, _2=1.0), c2=Row(_1=1, _2=1), c3=1.0, c4=1)])
 
+    def test_udf_with_rand(self):
+        # SPARK-40121: rand() with Python UDF.
+        self.assertEqual(
+            len(self.spark.range(10).select(udf(lambda x: x, DoubleType())(rand())).collect()), 10
+        )
+
 
 class UDFInitializationTests(unittest.TestCase):
     def tearDown(self):
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index fca43e454bf..1447788a609 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -116,6 +116,7 @@ trait EvalPythonExec extends UnaryExecNode {
         }.toArray
       }.toArray
       val projection = MutableProjection.create(allInputs.toSeq, child.output)
+      projection.initialize(context.partitionId())
       val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
         StructField(s"_$i", dt)
       }.toSeq)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org