You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/03/29 05:00:06 UTC
[spark] branch master updated: [SPARK-31287][PYTHON][SQL] Ignore
type hints in groupby.(cogroup.)applyInPandas and mapInPandas
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3165a95 [SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas
3165a95 is described below
commit 3165a95a04448546ae8955020566d718c6960223
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Sun Mar 29 13:59:18 2020 +0900
[SPARK-31287][PYTHON][SQL] Ignore type hints in groupby.(cogroup.)applyInPandas and mapInPandas
### What changes were proposed in this pull request?
This PR proposes to make pandas function APIs (`groupby.(cogroup.)applyInPandas` and `mapInPandas`) to ignore Python type hints.
### Why are the changes needed?
Python type hints are optional. It shouldn't affect where pandas UDFs are not used.
This is also a future work for them to support other type hints. We shouldn't at least throw an exception at this moment.
### Does this PR introduce any user-facing change?
No, it's master-only change.
```python
import pandas as pd
def pandas_plus_one(pdf: pd.DataFrame) -> pd.DataFrame:
return pdf + 1
spark.range(10).groupby('id').applyInPandas(pandas_plus_one, schema="id long").show()
```
```python
import pandas as pd
def pandas_plus_one(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
return left + 1
spark.range(10).groupby('id').cogroup(spark.range(10).groupby("id")).applyInPandas(pandas_plus_one, schema="id long").show()
```
```python
from typing import Iterator
import pandas as pd
def pandas_plus_one(iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
return map(lambda v: v + 1, iter)
spark.range(10).mapInPandas(pandas_plus_one, schema="id long").show()
```
**Before:**
Exception
**After:**
```
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
+---+
```
### How was this patch tested?
Closes #28052 from HyukjinKwon/SPARK-31287.
Authored-by: HyukjinKwon <gu...@apache.org>
Signed-off-by: HyukjinKwon <gu...@apache.org>
---
python/pyspark/sql/pandas/functions.py | 8 +++++
.../pyspark/sql/tests/test_pandas_udf_typehints.py | 42 ++++++++++++++++++++++
2 files changed, 50 insertions(+)
diff --git a/python/pyspark/sql/pandas/functions.py b/python/pyspark/sql/pandas/functions.py
index 31aa321..f43ebf8 100644
--- a/python/pyspark/sql/pandas/functions.py
+++ b/python/pyspark/sql/pandas/functions.py
@@ -384,6 +384,14 @@ def _create_pandas_udf(f, returnType, evalType):
"In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for "
"pandas UDF instead of specifying pandas UDF type which will be deprecated "
"in the future releases. See SPARK-28264 for more details.", UserWarning)
+ elif evalType in [PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+ PythonEvalType.SQL_MAP_PANDAS_ITER_UDF,
+ PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF]:
+ # In case of 'SQL_GROUPED_MAP_PANDAS_UDF', deprecation warning is being triggered
+ # at `apply` instead.
+ # In case of 'SQL_MAP_PANDAS_ITER_UDF' and 'SQL_COGROUPED_MAP_PANDAS_UDF', the
+ # evaluation type will always be set.
+ pass
elif len(argspec.annotations) > 0:
evalType = infer_eval_type(signature(f))
assert evalType is not None
diff --git a/python/pyspark/sql/tests/test_pandas_udf_typehints.py b/python/pyspark/sql/tests/test_pandas_udf_typehints.py
index 7c83c78..2582080 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_typehints.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_typehints.py
@@ -261,6 +261,48 @@ class PandasUDFTypeHintsTests(ReusedSQLTestCase):
expected = df.groupby('id').agg(mean(df.v).alias('weighted_mean(v, 1.0)')).sort('id')
assert_frame_equal(expected.toPandas(), actual.toPandas())
+ def test_ignore_type_hint_in_group_apply_in_pandas(self):
+ df = self.spark.range(10)
+ exec(
+ "def pandas_plus_one(v: pd.DataFrame) -> pd.DataFrame:\n"
+ " return v + 1",
+ self.local)
+
+ pandas_plus_one = self.local["pandas_plus_one"]
+
+ actual = df.groupby('id').applyInPandas(pandas_plus_one, schema=df.schema).sort('id')
+ expected = df.selectExpr("id + 1 as id")
+ assert_frame_equal(expected.toPandas(), actual.toPandas())
+
+ def test_ignore_type_hint_in_cogroup_apply_in_pandas(self):
+ df = self.spark.range(10)
+ exec(
+ "def pandas_plus_one(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:\n"
+ " return left + 1",
+ self.local)
+
+ pandas_plus_one = self.local["pandas_plus_one"]
+
+ actual = df.groupby('id').cogroup(
+ self.spark.range(10).groupby("id")
+ ).applyInPandas(pandas_plus_one, schema=df.schema).sort('id')
+ expected = df.selectExpr("id + 1 as id")
+ assert_frame_equal(expected.toPandas(), actual.toPandas())
+
+ def test_ignore_type_hint_in_map_in_pandas(self):
+ df = self.spark.range(10)
+ exec(
+ "from typing import Iterator\n"
+ "def pandas_plus_one(iter: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:\n"
+ " return map(lambda v: v + 1, iter)",
+ self.local)
+
+ pandas_plus_one = self.local["pandas_plus_one"]
+
+ actual = df.mapInPandas(pandas_plus_one, schema=df.schema)
+ expected = df.selectExpr("id + 1 as id")
+ assert_frame_equal(expected.toPandas(), actual.toPandas())
+
if __name__ == "__main__":
from pyspark.sql.tests.test_pandas_udf_typehints import *
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org