You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/08/10 11:34:25 UTC
[spark] branch master updated: [SPARK-39734][SPARK-36259][SPARK-39733][SPARK-37348][PYTHON] Functions Parity between PySpark and SQL
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f8b15395cf3 [SPARK-39734][SPARK-36259][SPARK-39733][SPARK-37348][PYTHON] Functions Parity between PySpark and SQL
f8b15395cf3 is described below
commit f8b15395cf347b6c6c6a4a20077fdeb31bfabb24
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Wed Aug 10 19:33:47 2022 +0800
[SPARK-39734][SPARK-36259][SPARK-39733][SPARK-37348][PYTHON] Functions Parity between PySpark and SQL
### What changes were proposed in this pull request?
Implement the missing functions in PySpark:
- call_udf
- localtimestamp
- map_contains_key
- pmod
After this PR, all functions in `org.apache.spark.sql.functions` can be found in `pyspark.sql.functions` or or have equivalents (e.g. `not` -> `~`)
### Why are the changes needed?
for function parity
### Does this PR introduce _any_ user-facing change?
yes, 4 new APIs added
### How was this patch tested?
added doctests
Closes #37449 from zhengruifeng/py_func_parity.
Authored-by: Ruifeng Zheng <ru...@apache.org>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
.../source/reference/pyspark.sql/functions.rst | 4 +
python/pyspark/sql/functions.py | 138 ++++++++++++++++++++-
python/pyspark/sql/tests/test_functions.py | 7 +-
3 files changed, 142 insertions(+), 7 deletions(-)
diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst
index ea495445426..a799bb8ad0a 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -84,6 +84,7 @@ Math Functions
log10
log1p
log2
+ pmod
pow
rint
round
@@ -125,6 +126,7 @@ Datetime Functions
quarter
month
last_day
+ localtimestamp
minute
months_between
next_day
@@ -188,6 +190,7 @@ Collection Functions
flatten
sequence
array_repeat
+ map_contains_key
map_keys
map_values
map_entries
@@ -326,6 +329,7 @@ UDF
.. autosummary::
:toctree: api/
+ call_udf
pandas_udf
udf
unwrap_udt
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index e73c70d8ca0..9dd81145243 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1077,6 +1077,45 @@ def pow(col1: Union["ColumnOrName", float], col2: Union["ColumnOrName", float])
return _invoke_binary_math_function("pow", col1, col2)
+def pmod(dividend: Union["ColumnOrName", float], divisor: Union["ColumnOrName", float]) -> Column:
+ """
+ Returns the positive value of dividend mod divisor.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ dividend : str, :class:`~pyspark.sql.Column` or float
+ the column that contains dividend, or the specified dividend value
+ divisor : str, :class:`~pyspark.sql.Column` or float
+ the column that contains divisor, or the specified divisor value
+
+ Examples
+ --------
+ >>> from pyspark.sql.functions import pmod
+ >>> df = spark.createDataFrame([
+ ... (1.0, float('nan')), (float('nan'), 2.0), (10.0, 3.0),
+ ... (float('nan'), float('nan')), (-3.0, 4.0), (-10.0, 3.0),
+ ... (-5.0, -6.0), (7.0, -8.0), (1.0, 2.0)],
+ ... ("a", "b"))
+ >>> df.select(pmod("a", "b")).show()
+ +----------+
+ |pmod(a, b)|
+ +----------+
+ | NaN|
+ | NaN|
+ | 1.0|
+ | NaN|
+ | 1.0|
+ | 2.0|
+ | -5.0|
+ | 7.0|
+ | 1.0|
+ +----------+
+ """
+ return _invoke_binary_math_function("pmod", dividend, divisor)
+
+
@since(1.6)
def row_number() -> Column:
"""
@@ -1997,6 +2036,28 @@ def current_timestamp() -> Column:
return _invoke_function("current_timestamp")
+def localtimestamp() -> Column:
+ """
+ Returns the current timestamp without time zone at the start of query evaluation
+ as a timestamp without time zone column. All calls of localtimestamp within the
+ same query return the same value.
+
+ .. versionadded:: 3.4.0
+
+ Examples
+ --------
+ >>> from pyspark.sql.functions import localtimestamp
+ >>> df = spark.range(0, 100)
+ >>> df.select(localtimestamp()).distinct().show()
+ +--------------------+
+ | localtimestamp()|
+ +--------------------+
+ |20...-...-... ...:...:...|
+ +--------------------+
+ """
+ return _invoke_function("localtimestamp")
+
+
def date_format(date: "ColumnOrName", format: str) -> Column:
"""
Converts a date/timestamp/string to a value of string in the format specified by the date
@@ -4441,6 +4502,39 @@ def flatten(col: "ColumnOrName") -> Column:
return _invoke_function_over_columns("flatten", col)
+def map_contains_key(col: "ColumnOrName", value: Any) -> Column:
+ """
+ Returns true if the map contains the key.
+
+ Parameters
+ ----------
+ col : :class:`~pyspark.sql.Column` or str
+ name of column or expression
+ value :
+ a literal value
+
+ .. versionadded:: 3.4.0
+
+ Examples
+ --------
+ >>> from pyspark.sql.functions import map_contains_key
+ >>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as data")
+ >>> df.select(map_contains_key("data", 1)).show()
+ +---------------------------------+
+ |array_contains(map_keys(data), 1)|
+ +---------------------------------+
+ | true|
+ +---------------------------------+
+ >>> df.select(map_contains_key("data", -1)).show()
+ +----------------------------------+
+ |array_contains(map_keys(data), -1)|
+ +----------------------------------+
+ | false|
+ +----------------------------------+
+ """
+ return _invoke_function("map_contains_key", _to_java_column(col), value)
+
+
def map_keys(col: "ColumnOrName") -> Column:
"""
Collection function: Returns an unordered array containing the keys of the map.
@@ -5404,11 +5498,53 @@ def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
return _invoke_function("bucket", numBuckets, _to_java_column(col))
+def call_udf(udfName: str, *cols: "ColumnOrName") -> Column:
+ """
+ Call an user-defined function.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ udfName : str
+ name of the user defined function (UDF)
+ cols : :class:`~pyspark.sql.Column` or str
+ column names or :class:`~pyspark.sql.Column`\\s to be used in the UDF
+
+ Examples
+ --------
+ >>> from pyspark.sql.functions import call_udf, col
+ >>> from pyspark.sql.types import IntegerType, StringType
+ >>> df = spark.createDataFrame([(1, "a"),(2, "b"), (3, "c")],["id", "name"])
+ >>> _ = spark.udf.register("intX2", lambda i: i * 2, IntegerType())
+ >>> df.select(call_udf("intX2", "id")).show()
+ +---------+
+ |intX2(id)|
+ +---------+
+ | 2|
+ | 4|
+ | 6|
+ +---------+
+ >>> _ = spark.udf.register("strX2", lambda s: s * 2, StringType())
+ >>> df.select(call_udf("strX2", col("name"))).show()
+ +-----------+
+ |strX2(name)|
+ +-----------+
+ | aa|
+ | bb|
+ | cc|
+ +-----------+
+ """
+ sc = SparkContext._active_spark_context
+ assert sc is not None and sc._jvm is not None
+ return _invoke_function("call_udf", udfName, _to_seq(sc, cols, _to_java_column))
+
+
def unwrap_udt(col: "ColumnOrName") -> Column:
"""
Unwrap UDT data type column into its underlying type.
- .. versionadded:: 3.4.0
+ .. versionadded:: 3.4.0
"""
return _invoke_function("unwrap_udt", _to_java_column(col))
diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py
index 5091fa711a8..44d95372d9d 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -100,12 +100,7 @@ class FunctionsTests(ReusedSQLTestCase):
missing_in_py = jvm_fn_set.difference(py_fn_set)
# Functions that we expect to be missing in python until they are added to pyspark
- expected_missing_in_py = {
- "call_udf", # TODO(SPARK-39734)
- "localtimestamp", # TODO(SPARK-36259)
- "map_contains_key", # TODO(SPARK-39733)
- "pmod", # TODO(SPARK-37348)
- }
+ expected_missing_in_py = set()
self.assertEqual(
expected_missing_in_py, missing_in_py, "Missing functions in pyspark not as expected"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org