You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/03/29 05:00:06 UTC

[spark] branch master updated: [SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3165a95  [SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas
3165a95 is described below

commit 3165a95a04448546ae8955020566d718c6960223
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Sun Mar 29 13:59:18 2020 +0900

    [SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to make pandas function APIs (`groupby.(cogroup.)applyInPandas` and `mapInPandas`) to ignore Python type hints.
    
    ### Why are the changes needed?
    
    Python type hints are optional. It shouldn't affect where pandas UDFs are not used.
    This is also a future work for them to support other type hints. We shouldn't at least throw an exception at this moment.
    
    ### Does this PR introduce any user-facing change?
    
    No, it's master-only change.
    
    ```python
    import pandas as pd
    
    def pandas_plus_one(pdf: pd.DataFrame) -> pd.DataFrame:
        return pdf + 1
    
    spark.range(10).groupby('id').applyInPandas(pandas_plus_one, schema="id long").show()
    ```
    ```python
    import pandas as pd
    
    def pandas_plus_one(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
        return left + 1
    
    spark.range(10).groupby('id').cogroup(spark.range(10).groupby("id")).applyInPandas(pandas_plus_one, schema="id long").show()
    ```
    
    ```python
    from typing import Iterator
    import pandas as pd
    
    def pandas_plus_one(iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
        return map(lambda v: v + 1, iter)
    
    spark.range(10).mapInPandas(pandas_plus_one, schema="id long").show()
    ```
    
    **Before:**
    
    Exception
    
    **After:**
    
    ```
    +---+
    | id|
    +---+
    |  1|
    |  2|
    |  3|
    |  4|
    |  5|
    |  6|
    |  7|
    |  8|
    |  9|
    | 10|
    +---+
    ```
    
    ### How was this patch tested?
    
    Closes #28052 from HyukjinKwon/SPARK-31287.
    
    Authored-by: HyukjinKwon <gu...@apache.org>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 python/pyspark/sql/pandas/functions.py             |  8 +++++
 .../pyspark/sql/tests/test_pandas_udf_typehints.py | 42 ++++++++++++++++++++++
 2 files changed, 50 insertions(+)

diff --git a/python/pyspark/sql/pandas/functions.py b/python/pyspark/sql/pandas/functions.py
index 31aa321..f43ebf8 100644
--- a/python/pyspark/sql/pandas/functions.py
+++ b/python/pyspark/sql/pandas/functions.py
@@ -384,6 +384,14 @@ def _create_pandas_udf(f, returnType, evalType):
                 "In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for "
                 "pandas UDF instead of specifying pandas UDF type which will be deprecated "
                 "in the future releases. See SPARK-28264 for more details.", UserWarning)
+        elif evalType in [PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+                          PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
+                          PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF]:
+            # In case of 'SQL_GROUPED_MAP_PANDAS_UDF',  deprecation warning is being triggered
+            # at `apply` instead.
+            # In case of 'SQL_MAP_PANDAS_ITER_UDF' and 'SQL_COGROUPED_MAP_PANDAS_UDF', the
+            # evaluation type will always be set.
+            pass
         elif len(argspec.annotations) > 0:
             evalType = infer_eval_type(signature(f))
             assert evalType is not None
diff --git a/python/pyspark/sql/tests/test_pandas_udf_typehints.py b/python/pyspark/sql/tests/test_pandas_udf_typehints.py
index 7c83c78..2582080 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_typehints.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_typehints.py
@@ -261,6 +261,48 @@ class PandasUDFTypeHintsTests(ReusedSQLTestCase):
         expected = df.groupby('id').agg(mean(df.v).alias('weighted_mean(v, 1.0)')).sort('id')
         assert_frame_equal(expected.toPandas(), actual.toPandas())
 
+    def test_ignore_type_hint_in_group_apply_in_pandas(self):
+        df = self.spark.range(10)
+        exec(
+            "def pandas_plus_one(v: pd.DataFrame) -> pd.DataFrame:\n"
+            "    return v + 1",
+            self.local)
+
+        pandas_plus_one = self.local["pandas_plus_one"]
+
+        actual = df.groupby('id').applyInPandas(pandas_plus_one, schema=df.schema).sort('id')
+        expected = df.selectExpr("id + 1 as id")
+        assert_frame_equal(expected.toPandas(), actual.toPandas())
+
+    def test_ignore_type_hint_in_cogroup_apply_in_pandas(self):
+        df = self.spark.range(10)
+        exec(
+            "def pandas_plus_one(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:\n"
+            "    return left + 1",
+            self.local)
+
+        pandas_plus_one = self.local["pandas_plus_one"]
+
+        actual = df.groupby('id').cogroup(
+            self.spark.range(10).groupby("id")
+        ).applyInPandas(pandas_plus_one, schema=df.schema).sort('id')
+        expected = df.selectExpr("id + 1 as id")
+        assert_frame_equal(expected.toPandas(), actual.toPandas())
+
+    def test_ignore_type_hint_in_map_in_pandas(self):
+        df = self.spark.range(10)
+        exec(
+            "from typing import Iterator\n"
+            "def pandas_plus_one(iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:\n"
+            "    return map(lambda v: v + 1, iter)",
+            self.local)
+
+        pandas_plus_one = self.local["pandas_plus_one"]
+
+        actual = df.mapInPandas(pandas_plus_one, schema=df.schema)
+        expected = df.selectExpr("id + 1 as id")
+        assert_frame_equal(expected.toPandas(), actual.toPandas())
+
 
 if __name__ == "__main__":
     from pyspark.sql.tests.test_pandas_udf_typehints import *


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org