You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/10/03 03:26:26 UTC
[spark] branch master updated: [SPARK-36907][PYTHON] Fix
DataFrameGroupBy.apply without shortcut
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 38d3981 [SPARK-36907][PYTHON] Fix DataFrameGroupBy.apply without shortcut
38d3981 is described below
commit 38d39812c176e4b52a08397f7936f87ea32930e7
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Sun Oct 3 12:25:19 2021 +0900
[SPARK-36907][PYTHON] Fix DataFrameGroupBy.apply without shortcut
### What changes were proposed in this pull request?
Fix `DataFrameGroupBy.apply` without shortcut.
Pandas' `DataFrameGroupBy.apply` sometimes behaves weirdly when the udf returns `Series` and whether there is only one group or more. E.g.,:
```py
>>> pdf = pd.DataFrame(
... {"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]},
... columns=["a", "b", "c"],
... )
>>> pdf.groupby('b').apply(lambda x: x['a'])
b
1 0 1
1 2
2 2 3
3 3 4
5 4 5
8 5 6
Name: a, dtype: int64
>>> pdf[pdf['b'] == 1].groupby('b').apply(lambda x: x['a'])
a 0 1
b
1 1 2
```
If there is only one group, it returns a "wide" `DataFrame` instead of `Series`.
In our non-shortcut path, there is always only one group because it will be run in `groupby-applyInPandas`, so we will get `DataFrame`, then we should convert it to `Series` ourselves.
### Why are the changes needed?
`DataFrameGroupBy.apply` without shortcut could raise an exception when it returns `Series`.
```py
>>> ps.options.compute.shortcut_limit = 3
>>> psdf = ps.DataFrame(
... {"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]},
... columns=["a", "b", "c"],
... )
>>> psdf.groupby("b").apply(lambda x: x["a"])
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
...
ValueError: Length mismatch: Expected axis has 2 elements, new values have 3 elements
```
### Does this PR introduce _any_ user-facing change?
The error above will be gone:
```py
>>> psdf.groupby("b").apply(lambda x: x["a"])
b
1 0 1
1 2
2 2 3
3 3 4
5 4 5
8 5 6
Name: a, dtype: int64
```
### How was this patch tested?
Added tests.
Closes #34160 from ueshin/issues/SPARK-36907/groupby-apply.
Authored-by: Takuya UESHIN <ue...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/pandas/groupby.py | 16 +++++++--
python/pyspark/pandas/tests/test_groupby.py | 52 +++++++++++++++++++++++++++++
2 files changed, 66 insertions(+), 2 deletions(-)
diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index 79e0f4f..a61a024 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -42,6 +42,7 @@ from typing import (
cast,
TYPE_CHECKING,
)
+import warnings
import pandas as pd
from pandas.api.types import is_hashable, is_list_like
@@ -1207,10 +1208,11 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
pdf[groupkey_name].rename(psser.name)
for groupkey_name, psser in zip(groupkey_names, self._groupkeys)
]
+ grouped = pdf.groupby(groupkeys)
if is_series_groupby:
- pser_or_pdf = pdf.groupby(groupkeys)[name].apply(pandas_apply, *args, **kwargs)
+ pser_or_pdf = grouped[name].apply(pandas_apply, *args, **kwargs)
else:
- pser_or_pdf = pdf.groupby(groupkeys).apply(pandas_apply, *args, **kwargs)
+ pser_or_pdf = grouped.apply(pandas_apply, *args, **kwargs)
psser_or_psdf = ps.from_pandas(pser_or_pdf)
if len(pdf) <= limit:
@@ -1218,6 +1220,14 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
psser_or_psdf = psser_or_psdf.rename(cast(SeriesGroupBy, self)._psser.name)
return cast(Union[Series, DataFrame], psser_or_psdf)
+ if len(grouped) <= 1:
+ with warnings.catch_warnings():
+ warnings.simplefilter("always")
+ warnings.warn(
+ "The amount of data for return type inference might not be large enough. "
+ "Consider increasing an option `compute.shortcut_limit`."
+ )
+
if isinstance(psser_or_psdf, Series):
should_return_series = True
psdf_from_pandas = psser_or_psdf._psdf
@@ -1295,6 +1305,8 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
pdf_or_ser = pdf.groupby(groupkey_names)[name].apply(wrapped_func, *args, **kwargs)
else:
pdf_or_ser = pdf.groupby(groupkey_names).apply(wrapped_func, *args, **kwargs)
+ if should_return_series and isinstance(pdf_or_ser, pd.DataFrame):
+ pdf_or_ser = pdf_or_ser.stack()
if not isinstance(pdf_or_ser, pd.DataFrame):
return pd.DataFrame(pdf_or_ser)
diff --git a/python/pyspark/pandas/tests/test_groupby.py b/python/pyspark/pandas/tests/test_groupby.py
index 2b6cba9..4e1c0d0 100644
--- a/python/pyspark/pandas/tests/test_groupby.py
+++ b/python/pyspark/pandas/tests/test_groupby.py
@@ -2114,6 +2114,58 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
)
self.assert_eq(acc.value, 4)
+ def test_apply_return_series(self):
+ # SPARK-36907: Fix DataFrameGroupBy.apply without shortcut.
+ pdf = pd.DataFrame(
+ {"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]},
+ columns=["a", "b", "c"],
+ )
+ psdf = ps.from_pandas(pdf)
+
+ self.assert_eq(
+ psdf.groupby("b").apply(lambda x: x.iloc[0]).sort_index(),
+ pdf.groupby("b").apply(lambda x: x.iloc[0]).sort_index(),
+ )
+ self.assert_eq(
+ psdf.groupby("b").apply(lambda x: x["a"]).sort_index(),
+ pdf.groupby("b").apply(lambda x: x["a"]).sort_index(),
+ )
+ self.assert_eq(
+ psdf.groupby(["b", "c"]).apply(lambda x: x.iloc[0]).sort_index(),
+ pdf.groupby(["b", "c"]).apply(lambda x: x.iloc[0]).sort_index(),
+ )
+ self.assert_eq(
+ psdf.groupby(["b", "c"]).apply(lambda x: x["a"]).sort_index(),
+ pdf.groupby(["b", "c"]).apply(lambda x: x["a"]).sort_index(),
+ )
+
+ # multi-index columns
+ columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
+ pdf.columns = columns
+ psdf.columns = columns
+
+ self.assert_eq(
+ psdf.groupby(("x", "b")).apply(lambda x: x.iloc[0]).sort_index(),
+ pdf.groupby(("x", "b")).apply(lambda x: x.iloc[0]).sort_index(),
+ )
+ self.assert_eq(
+ psdf.groupby(("x", "b")).apply(lambda x: x[("x", "a")]).sort_index(),
+ pdf.groupby(("x", "b")).apply(lambda x: x[("x", "a")]).sort_index(),
+ )
+ self.assert_eq(
+ psdf.groupby([("x", "b"), ("y", "c")]).apply(lambda x: x.iloc[0]).sort_index(),
+ pdf.groupby([("x", "b"), ("y", "c")]).apply(lambda x: x.iloc[0]).sort_index(),
+ )
+ self.assert_eq(
+ psdf.groupby([("x", "b"), ("y", "c")]).apply(lambda x: x[("x", "a")]).sort_index(),
+ pdf.groupby([("x", "b"), ("y", "c")]).apply(lambda x: x[("x", "a")]).sort_index(),
+ )
+
+ def test_apply_return_series_without_shortcut(self):
+ # SPARK-36907: Fix DataFrameGroupBy.apply without shortcut.
+ with ps.option_context("compute.shortcut_limit", 2):
+ self.test_apply_return_series()
+
def test_transform(self):
pdf = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]},
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org