You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/27 19:26:13 UTC
spark git commit: [SPARK-23233][PYTHON] Reset the cache in
asNondeterministic to set deterministic properly
Repository: spark
Updated Branches:
refs/heads/master 6328868e5 -> 3227d14fe
[SPARK-23233][PYTHON] Reset the cache in asNondeterministic to set deterministic properly
## What changes were proposed in this pull request?
Reproducer:
```python
from pyspark.sql.functions import udf
f = udf(lambda x: x)
spark.range(1).select(f("id")) # cache JVM UDF instance.
f = f.asNondeterministic()
spark.range(1).select(f("id"))._jdf.logicalPlan().projectList().head().deterministic()
```
It should return `False` but the current master returns `True`. Seems it's because we cache the JVM UDF instance and then we reuse it even after setting `deterministic` disabled once it's called.
## How was this patch tested?
Manually tested. I am not sure if I should add the test with a lot of JVM accesses with the intetnal stuff .. Let me know if anyone feels so. I will add.
Author: hyukjinkwon <gu...@gmail.com>
Closes #20409 from HyukjinKwon/SPARK-23233.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3227d14f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3227d14f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3227d14f
Branch: refs/heads/master
Commit: 3227d14feb1a65e95a2bf326cff6ac95615cc5ac
Parents: 6328868
Author: hyukjinkwon <gu...@gmail.com>
Authored: Sat Jan 27 11:26:09 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Jan 27 11:26:09 2018 -0800
----------------------------------------------------------------------
python/pyspark/sql/tests.py | 13 +++++++++++++
python/pyspark/sql/udf.py | 3 +++
2 files changed, 16 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3227d14f/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a466ab8..ca7bbf8 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -441,6 +441,19 @@ class SQLTests(ReusedSQLTestCase):
pydoc.render_doc(random_udf1)
pydoc.render_doc(udf(lambda x: x).asNondeterministic)
+ def test_nondeterministic_udf3(self):
+ # regression test for SPARK-23233
+ from pyspark.sql.functions import udf
+ f = udf(lambda x: x)
+ # Here we cache the JVM UDF instance.
+ self.spark.range(1).select(f("id"))
+ # This should reset the cache to set the deterministic status correctly.
+ f = f.asNondeterministic()
+ # Check the deterministic status of udf.
+ df = self.spark.range(1).select(f("id"))
+ deterministic = df._jdf.logicalPlan().projectList().head().deterministic()
+ self.assertFalse(deterministic)
+
def test_nondeterministic_udf_in_aggregate(self):
from pyspark.sql.functions import udf, sum
import random
http://git-wip-us.apache.org/repos/asf/spark/blob/3227d14f/python/pyspark/sql/udf.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index de96846..4f30330 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -188,6 +188,9 @@ class UserDefinedFunction(object):
.. versionadded:: 2.3
"""
+ # Here, we explicitly clean the cache to create a JVM UDF instance
+ # with 'deterministic' updated. See SPARK-23233.
+ self._judf_placeholder = None
self.deterministic = False
return self
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org