You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/08/10 11:34:25 UTC

[spark] branch master updated: [SPARK-39734][SPARK-36259][SPARK-39733][SPARK-37348][PYTHON] Functions Parity between PySpark and SQL

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f8b15395cf3 [SPARK-39734][SPARK-36259][SPARK-39733][SPARK-37348][PYTHON] Functions Parity between PySpark and SQL
f8b15395cf3 is described below

commit f8b15395cf347b6c6c6a4a20077fdeb31bfabb24
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Wed Aug 10 19:33:47 2022 +0800

    [SPARK-39734][SPARK-36259][SPARK-39733][SPARK-37348][PYTHON] Functions Parity between PySpark and SQL
    
    ### What changes were proposed in this pull request?
    Implement the missing functions in PySpark:
    
    -  call_udf
    -  localtimestamp
    -  map_contains_key
    -  pmod
    
    After this PR, all functions in `org.apache.spark.sql.functions` can be found in `pyspark.sql.functions` or or have equivalents (e.g. `not` -> `~`)
    
    ### Why are the changes needed?
    for function parity
    
    ### Does this PR introduce _any_ user-facing change?
    yes, 4 new APIs added
    
    ### How was this patch tested?
    added doctests
    
    Closes #37449 from zhengruifeng/py_func_parity.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 .../source/reference/pyspark.sql/functions.rst     |   4 +
 python/pyspark/sql/functions.py                    | 138 ++++++++++++++++++++-
 python/pyspark/sql/tests/test_functions.py         |   7 +-
 3 files changed, 142 insertions(+), 7 deletions(-)

diff --git a/python/docs/source/reference/pyspark.sql/functions.rst b/python/docs/source/reference/pyspark.sql/functions.rst
index ea495445426..a799bb8ad0a 100644
--- a/python/docs/source/reference/pyspark.sql/functions.rst
+++ b/python/docs/source/reference/pyspark.sql/functions.rst
@@ -84,6 +84,7 @@ Math Functions
     log10
     log1p
     log2
+    pmod
     pow
     rint
     round
@@ -125,6 +126,7 @@ Datetime Functions
     quarter
     month
     last_day
+    localtimestamp
     minute
     months_between
     next_day
@@ -188,6 +190,7 @@ Collection Functions
     flatten
     sequence
     array_repeat
+    map_contains_key
     map_keys
     map_values
     map_entries
@@ -326,6 +329,7 @@ UDF
 .. autosummary::
     :toctree: api/
 
+    call_udf
     pandas_udf
     udf
     unwrap_udt
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index e73c70d8ca0..9dd81145243 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1077,6 +1077,45 @@ def pow(col1: Union["ColumnOrName", float], col2: Union["ColumnOrName", float])
     return _invoke_binary_math_function("pow", col1, col2)
 
 
+def pmod(dividend: Union["ColumnOrName", float], divisor: Union["ColumnOrName", float]) -> Column:
+    """
+    Returns the positive value of dividend mod divisor.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    dividend : str, :class:`~pyspark.sql.Column` or float
+        the column that contains dividend, or the specified dividend value
+    divisor : str, :class:`~pyspark.sql.Column` or float
+        the column that contains divisor, or the specified divisor value
+
+    Examples
+    --------
+    >>> from pyspark.sql.functions import pmod
+    >>> df = spark.createDataFrame([
+    ...     (1.0, float('nan')), (float('nan'), 2.0), (10.0, 3.0),
+    ...     (float('nan'), float('nan')), (-3.0, 4.0), (-10.0, 3.0),
+    ...     (-5.0, -6.0), (7.0, -8.0), (1.0, 2.0)],
+    ...     ("a", "b"))
+    >>> df.select(pmod("a", "b")).show()
+    +----------+
+    |pmod(a, b)|
+    +----------+
+    |       NaN|
+    |       NaN|
+    |       1.0|
+    |       NaN|
+    |       1.0|
+    |       2.0|
+    |      -5.0|
+    |       7.0|
+    |       1.0|
+    +----------+
+    """
+    return _invoke_binary_math_function("pmod", dividend, divisor)
+
+
 @since(1.6)
 def row_number() -> Column:
     """
@@ -1997,6 +2036,28 @@ def current_timestamp() -> Column:
     return _invoke_function("current_timestamp")
 
 
+def localtimestamp() -> Column:
+    """
+    Returns the current timestamp without time zone at the start of query evaluation
+    as a timestamp without time zone column. All calls of localtimestamp within the
+    same query return the same value.
+
+    .. versionadded:: 3.4.0
+
+    Examples
+    --------
+    >>> from pyspark.sql.functions import localtimestamp
+    >>> df = spark.range(0, 100)
+    >>> df.select(localtimestamp()).distinct().show()
+    +--------------------+
+    |    localtimestamp()|
+    +--------------------+
+    |20...-...-... ...:...:...|
+    +--------------------+
+    """
+    return _invoke_function("localtimestamp")
+
+
 def date_format(date: "ColumnOrName", format: str) -> Column:
     """
     Converts a date/timestamp/string to a value of string in the format specified by the date
@@ -4441,6 +4502,39 @@ def flatten(col: "ColumnOrName") -> Column:
     return _invoke_function_over_columns("flatten", col)
 
 
+def map_contains_key(col: "ColumnOrName", value: Any) -> Column:
+    """
+    Returns true if the map contains the key.
+
+    Parameters
+    ----------
+    col : :class:`~pyspark.sql.Column` or str
+        name of column or expression
+    value :
+        a literal value
+
+    .. versionadded:: 3.4.0
+
+    Examples
+    --------
+    >>> from pyspark.sql.functions import map_contains_key
+    >>> df = spark.sql("SELECT map(1, 'a', 2, 'b') as data")
+    >>> df.select(map_contains_key("data", 1)).show()
+    +---------------------------------+
+    |array_contains(map_keys(data), 1)|
+    +---------------------------------+
+    |                             true|
+    +---------------------------------+
+    >>> df.select(map_contains_key("data", -1)).show()
+    +----------------------------------+
+    |array_contains(map_keys(data), -1)|
+    +----------------------------------+
+    |                             false|
+    +----------------------------------+
+    """
+    return _invoke_function("map_contains_key", _to_java_column(col), value)
+
+
 def map_keys(col: "ColumnOrName") -> Column:
     """
     Collection function: Returns an unordered array containing the keys of the map.
@@ -5404,11 +5498,53 @@ def bucket(numBuckets: Union[Column, int], col: "ColumnOrName") -> Column:
     return _invoke_function("bucket", numBuckets, _to_java_column(col))
 
 
+def call_udf(udfName: str, *cols: "ColumnOrName") -> Column:
+    """
+    Call an user-defined function.
+
+    .. versionadded:: 3.4.0
+
+    Parameters
+    ----------
+    udfName : str
+        name of the user defined function (UDF)
+    cols : :class:`~pyspark.sql.Column` or str
+        column names or :class:`~pyspark.sql.Column`\\s to be used in the UDF
+
+    Examples
+    --------
+    >>> from pyspark.sql.functions import call_udf, col
+    >>> from pyspark.sql.types import IntegerType, StringType
+    >>> df = spark.createDataFrame([(1, "a"),(2, "b"), (3, "c")],["id", "name"])
+    >>> _ = spark.udf.register("intX2", lambda i: i * 2, IntegerType())
+    >>> df.select(call_udf("intX2", "id")).show()
+    +---------+
+    |intX2(id)|
+    +---------+
+    |        2|
+    |        4|
+    |        6|
+    +---------+
+    >>> _ = spark.udf.register("strX2", lambda s: s * 2, StringType())
+    >>> df.select(call_udf("strX2", col("name"))).show()
+    +-----------+
+    |strX2(name)|
+    +-----------+
+    |         aa|
+    |         bb|
+    |         cc|
+    +-----------+
+    """
+    sc = SparkContext._active_spark_context
+    assert sc is not None and sc._jvm is not None
+    return _invoke_function("call_udf", udfName, _to_seq(sc, cols, _to_java_column))
+
+
 def unwrap_udt(col: "ColumnOrName") -> Column:
     """
     Unwrap UDT data type column into its underlying type.
 
-        .. versionadded:: 3.4.0
+    .. versionadded:: 3.4.0
 
     """
     return _invoke_function("unwrap_udt", _to_java_column(col))
diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py
index 5091fa711a8..44d95372d9d 100644
--- a/python/pyspark/sql/tests/test_functions.py
+++ b/python/pyspark/sql/tests/test_functions.py
@@ -100,12 +100,7 @@ class FunctionsTests(ReusedSQLTestCase):
         missing_in_py = jvm_fn_set.difference(py_fn_set)
 
         # Functions that we expect to be missing in python until they are added to pyspark
-        expected_missing_in_py = {
-            "call_udf",  # TODO(SPARK-39734)
-            "localtimestamp",  # TODO(SPARK-36259)
-            "map_contains_key",  # TODO(SPARK-39733)
-            "pmod",  # TODO(SPARK-37348)
-        }
+        expected_missing_in_py = set()
 
         self.assertEqual(
             expected_missing_in_py, missing_in_py, "Missing functions in pyspark not as expected"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org