You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2021/10/03 03:26:26 UTC

[spark] branch master updated: [SPARK-36907][PYTHON] Fix DataFrameGroupBy.apply without shortcut

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 38d3981  [SPARK-36907][PYTHON] Fix DataFrameGroupBy.apply without shortcut
38d3981 is described below

commit 38d39812c176e4b52a08397f7936f87ea32930e7
Author: Takuya UESHIN <ue...@databricks.com>
AuthorDate: Sun Oct 3 12:25:19 2021 +0900

    [SPARK-36907][PYTHON] Fix DataFrameGroupBy.apply without shortcut
    
    ### What changes were proposed in this pull request?
    
    Fix `DataFrameGroupBy.apply` without shortcut.
    
    Pandas' `DataFrameGroupBy.apply` sometimes behaves weirdly when the udf returns `Series` and whether there is only one group or more. E.g.,:
    
    ```py
    >>> pdf = pd.DataFrame(
    ...      {"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]},
    ...      columns=["a", "b", "c"],
    ... )
    
    >>> pdf.groupby('b').apply(lambda x: x['a'])
    b
    1  0    1
       1    2
    2  2    3
    3  3    4
    5  4    5
    8  5    6
    Name: a, dtype: int64
    >>> pdf[pdf['b'] == 1].groupby('b').apply(lambda x: x['a'])
    a  0  1
    b
    1  1  2
    ```
    
    If there is only one group, it returns a "wide" `DataFrame` instead of `Series`.
    
    In our non-shortcut path, there is always only one group because it will be run in `groupby-applyInPandas`, so we will get `DataFrame`, then we should convert it to `Series` ourselves.
    
    ### Why are the changes needed?
    
    `DataFrameGroupBy.apply` without shortcut could raise an exception when it returns `Series`.
    
    ```py
    >>> ps.options.compute.shortcut_limit = 3
    >>> psdf = ps.DataFrame(
    ...     {"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]},
    ...     columns=["a", "b", "c"],
    ... )
    >>> psdf.groupby("b").apply(lambda x: x["a"])
    org.apache.spark.api.python.PythonException: Traceback (most recent call last):
    ...
    ValueError: Length mismatch: Expected axis has 2 elements, new values have 3 elements
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    The error above will be gone:
    
    ```py
    >>> psdf.groupby("b").apply(lambda x: x["a"])
    b
    1  0    1
       1    2
    2  2    3
    3  3    4
    5  4    5
    8  5    6
    Name: a, dtype: int64
    ```
    
    ### How was this patch tested?
    
    Added tests.
    
    Closes #34160 from ueshin/issues/SPARK-36907/groupby-apply.
    
    Authored-by: Takuya UESHIN <ue...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/pandas/groupby.py            | 16 +++++++--
 python/pyspark/pandas/tests/test_groupby.py | 52 +++++++++++++++++++++++++++++
 2 files changed, 66 insertions(+), 2 deletions(-)

diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index 79e0f4f..a61a024 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -42,6 +42,7 @@ from typing import (
     cast,
     TYPE_CHECKING,
 )
+import warnings
 
 import pandas as pd
 from pandas.api.types import is_hashable, is_list_like
@@ -1207,10 +1208,11 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
                 pdf[groupkey_name].rename(psser.name)
                 for groupkey_name, psser in zip(groupkey_names, self._groupkeys)
             ]
+            grouped = pdf.groupby(groupkeys)
             if is_series_groupby:
-                pser_or_pdf = pdf.groupby(groupkeys)[name].apply(pandas_apply, *args, **kwargs)
+                pser_or_pdf = grouped[name].apply(pandas_apply, *args, **kwargs)
             else:
-                pser_or_pdf = pdf.groupby(groupkeys).apply(pandas_apply, *args, **kwargs)
+                pser_or_pdf = grouped.apply(pandas_apply, *args, **kwargs)
             psser_or_psdf = ps.from_pandas(pser_or_pdf)
 
             if len(pdf) <= limit:
@@ -1218,6 +1220,14 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
                     psser_or_psdf = psser_or_psdf.rename(cast(SeriesGroupBy, self)._psser.name)
                 return cast(Union[Series, DataFrame], psser_or_psdf)
 
+            if len(grouped) <= 1:
+                with warnings.catch_warnings():
+                    warnings.simplefilter("always")
+                    warnings.warn(
+                        "The amount of data for return type inference might not be large enough. "
+                        "Consider increasing an option `compute.shortcut_limit`."
+                    )
+
             if isinstance(psser_or_psdf, Series):
                 should_return_series = True
                 psdf_from_pandas = psser_or_psdf._psdf
@@ -1295,6 +1305,8 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
                 pdf_or_ser = pdf.groupby(groupkey_names)[name].apply(wrapped_func, *args, **kwargs)
             else:
                 pdf_or_ser = pdf.groupby(groupkey_names).apply(wrapped_func, *args, **kwargs)
+                if should_return_series and isinstance(pdf_or_ser, pd.DataFrame):
+                    pdf_or_ser = pdf_or_ser.stack()
 
             if not isinstance(pdf_or_ser, pd.DataFrame):
                 return pd.DataFrame(pdf_or_ser)
diff --git a/python/pyspark/pandas/tests/test_groupby.py b/python/pyspark/pandas/tests/test_groupby.py
index 2b6cba9..4e1c0d0 100644
--- a/python/pyspark/pandas/tests/test_groupby.py
+++ b/python/pyspark/pandas/tests/test_groupby.py
@@ -2114,6 +2114,58 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
         )
         self.assert_eq(acc.value, 4)
 
+    def test_apply_return_series(self):
+        # SPARK-36907: Fix DataFrameGroupBy.apply without shortcut.
+        pdf = pd.DataFrame(
+            {"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]},
+            columns=["a", "b", "c"],
+        )
+        psdf = ps.from_pandas(pdf)
+
+        self.assert_eq(
+            psdf.groupby("b").apply(lambda x: x.iloc[0]).sort_index(),
+            pdf.groupby("b").apply(lambda x: x.iloc[0]).sort_index(),
+        )
+        self.assert_eq(
+            psdf.groupby("b").apply(lambda x: x["a"]).sort_index(),
+            pdf.groupby("b").apply(lambda x: x["a"]).sort_index(),
+        )
+        self.assert_eq(
+            psdf.groupby(["b", "c"]).apply(lambda x: x.iloc[0]).sort_index(),
+            pdf.groupby(["b", "c"]).apply(lambda x: x.iloc[0]).sort_index(),
+        )
+        self.assert_eq(
+            psdf.groupby(["b", "c"]).apply(lambda x: x["a"]).sort_index(),
+            pdf.groupby(["b", "c"]).apply(lambda x: x["a"]).sort_index(),
+        )
+
+        # multi-index columns
+        columns = pd.MultiIndex.from_tuples([("x", "a"), ("x", "b"), ("y", "c")])
+        pdf.columns = columns
+        psdf.columns = columns
+
+        self.assert_eq(
+            psdf.groupby(("x", "b")).apply(lambda x: x.iloc[0]).sort_index(),
+            pdf.groupby(("x", "b")).apply(lambda x: x.iloc[0]).sort_index(),
+        )
+        self.assert_eq(
+            psdf.groupby(("x", "b")).apply(lambda x: x[("x", "a")]).sort_index(),
+            pdf.groupby(("x", "b")).apply(lambda x: x[("x", "a")]).sort_index(),
+        )
+        self.assert_eq(
+            psdf.groupby([("x", "b"), ("y", "c")]).apply(lambda x: x.iloc[0]).sort_index(),
+            pdf.groupby([("x", "b"), ("y", "c")]).apply(lambda x: x.iloc[0]).sort_index(),
+        )
+        self.assert_eq(
+            psdf.groupby([("x", "b"), ("y", "c")]).apply(lambda x: x[("x", "a")]).sort_index(),
+            pdf.groupby([("x", "b"), ("y", "c")]).apply(lambda x: x[("x", "a")]).sort_index(),
+        )
+
+    def test_apply_return_series_without_shortcut(self):
+        # SPARK-36907: Fix DataFrameGroupBy.apply without shortcut.
+        with ps.option_context("compute.shortcut_limit", 2):
+            self.test_apply_return_series()
+
     def test_transform(self):
         pdf = pd.DataFrame(
             {"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]},

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org