You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/08/18 03:25:02 UTC
[spark] branch branch-3.2 updated: [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new fcec11ad932 [SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF
fcec11ad932 is described below
commit fcec11ad9329553f4bea024227bdc6468da85278
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Thu Aug 18 12:23:02 2022 +0900
[SPARK-40121][PYTHON][SQL] Initialize projection used for Python UDF
This PR proposes to initialize the projection so non-deterministic expressions can be evaluated with Python UDFs.
To make the Python UDF working with non-deterministic expressions.
Yes.
```python
from pyspark.sql.functions import udf, rand
spark.range(10).select(udf(lambda x: x, "double")(rand())).show()
```
**Before**
```
java.lang.NullPointerException
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:126)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1161)
at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1176)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1213)
```
**After**
```
+----------------------------------+
|<lambda>rand(-2507211707257730645)|
+----------------------------------+
| 0.7691724424045242|
| 0.09602244075319044|
| 0.3006471278112862|
| 0.4182649571961977|
| 0.29349096650900974|
| 0.7987097908937618|
| 0.5324802583101007|
| 0.72460930912789|
| 0.1367749768412846|
| 0.17277322931919348|
+----------------------------------+
```
Manually tested, and unittest was added.
Closes #37552 from HyukjinKwon/SPARK-40121.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
(cherry picked from commit 336c9bc535895530cc3983b24e7507229fa9570d)
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/sql/tests/test_udf.py | 8 +++++++-
.../org/apache/spark/sql/execution/python/EvalPythonExec.scala | 1 +
2 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py
index fc475f1121d..5e6738a2f8e 100644
--- a/python/pyspark/sql/tests/test_udf.py
+++ b/python/pyspark/sql/tests/test_udf.py
@@ -23,7 +23,7 @@ import unittest
from pyspark import SparkContext
from pyspark.sql import SparkSession, Column, Row
-from pyspark.sql.functions import udf
+from pyspark.sql.functions import udf, rand
from pyspark.sql.udf import UserDefinedFunction
from pyspark.sql.types import StringType, IntegerType, BooleanType, DoubleType, LongType, \
ArrayType, StructType, StructField
@@ -705,6 +705,12 @@ class UDFTests(ReusedSQLTestCase):
finally:
shutil.rmtree(path)
+ def test_udf_with_rand(self):
+ # SPARK-40121: rand() with Python UDF.
+ self.assertEqual(
+ len(self.spark.range(10).select(udf(lambda x: x, DoubleType())(rand())).collect()), 10
+ )
+
class UDFInitializationTests(unittest.TestCase):
def tearDown(self):
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
index fca43e454bf..1447788a609 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala
@@ -116,6 +116,7 @@ trait EvalPythonExec extends UnaryExecNode {
}.toArray
}.toArray
val projection = MutableProjection.create(allInputs.toSeq, child.output)
+ projection.initialize(context.partitionId())
val schema = StructType(dataTypes.zipWithIndex.map { case (dt, i) =>
StructField(s"_$i", dt)
}.toSeq)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org