You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/06/05 01:49:58 UTC

[spark] branch master updated: [SPARK-39284][PS] Implement Groupby.mad

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d793c5c6858 [SPARK-39284][PS] Implement Groupby.mad
d793c5c6858 is described below

commit d793c5c6858cb3d89fd981495a85f4c60ae63035
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Sun Jun 5 09:49:24 2022 +0800

    [SPARK-39284][PS] Implement Groupby.mad
    
    ### What changes were proposed in this pull request?
    Implement Groupby.mad
    
    ### Why are the changes needed?
    to increase pandas api coverage
    
    ### Does this PR introduce _any_ user-facing change?
    yes
    
    ```
    In [6]: pdf = pd.DataFrame({"A": [1, 2, 2, 1, 1], "B": [3, 2, 3, 9, 0], "C": [3, 4, 13, -14, 9]})
    
    In [7]: psdf = ps.from_pandas(pdf)
    
    In [8]: pdf.groupby("A")[["B", "C"]].mad()
    Out[8]:
              B         C
    A
    1  3.333333  8.888889
    2  0.500000  4.500000
    
    In [9]: psdf.groupby("A")[["B", "C"]].mad()
    Out[9]:
              B         C
    A
    1  3.333333  8.888889
    2  0.500000  4.500000
    
    In [10]: pdf.B.groupby(pdf.A).mad()
    Out[10]:
    A
    1    3.333333
    2    0.500000
    Name: B, dtype: float64
    
    In [11]: psdf.B.groupby(psdf.A).mad()
    Out[11]:
    A
    1    3.333333
    2    0.500000
    Name: B, dtype: float64
    
    ```
    
    ### How was this patch tested?
    added ut
    
    Closes #36660 from zhengruifeng/ps_groupby_mad.
    
    Lead-authored-by: Ruifeng Zheng <ru...@apache.org>
    Co-authored-by: Ruifeng Zheng <ru...@foxmail.com>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 python/pyspark/pandas/groupby.py            | 84 +++++++++++++++++++++++++++--
 python/pyspark/pandas/missing/groupby.py    |  2 -
 python/pyspark/pandas/tests/test_groupby.py |  3 ++
 3 files changed, 83 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index ce8a322c20b..4377ad6a5c9 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -753,6 +753,80 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
             bool_to_numeric=True,
         )
 
+    # TODO: 'axis', 'skipna', 'level' parameter should be implemented.
+    def mad(self) -> FrameLike:
+        """
+        Compute mean absolute deviation of groups, excluding missing values.
+
+        .. versionadded:: 3.4.0
+
+        Examples
+        --------
+        >>> df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, True],
+        ...                    "C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]})
+
+        >>> df.groupby("A").mad()
+                  B         C
+        A
+        1  0.444444  0.444444
+        2  0.000000  0.000000
+
+        >>> df.B.groupby(df.A).mad()
+        A
+        1    0.444444
+        2    0.000000
+        Name: B, dtype: float64
+
+        See Also
+        --------
+        pyspark.pandas.Series.groupby
+        pyspark.pandas.DataFrame.groupby
+        """
+        groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))]
+        internal, agg_columns, sdf = self._prepare_reduce(
+            groupkey_names=groupkey_names,
+            accepted_spark_types=(NumericType, BooleanType),
+            bool_to_numeric=False,
+        )
+        psdf: DataFrame = DataFrame(internal)
+
+        if len(psdf._internal.column_labels) > 0:
+            window = Window.partitionBy(groupkey_names).rowsBetween(
+                Window.unboundedPreceding, Window.unboundedFollowing
+            )
+            new_agg_scols = {}
+            new_stat_scols = []
+            for agg_column in agg_columns:
+                # it is not able to directly use 'self._reduce_for_stat_function', due to
+                # 'it is not allowed to use a window function inside an aggregate function'.
+                # so we need to create temporary columns to compute the 'abs(x - avg(x))' here.
+                agg_column_name = agg_column._internal.data_spark_column_names[0]
+                new_agg_column_name = verify_temp_column_name(
+                    psdf._internal.spark_frame, "__tmp_agg_col_{}__".format(agg_column_name)
+                )
+                casted_agg_scol = F.col(agg_column_name).cast("double")
+                new_agg_scols[new_agg_column_name] = F.abs(
+                    casted_agg_scol - F.avg(casted_agg_scol).over(window)
+                )
+                new_stat_scols.append(F.avg(F.col(new_agg_column_name)).alias(agg_column_name))
+
+            sdf = (
+                psdf._internal.spark_frame.withColumns(new_agg_scols)
+                .groupby(groupkey_names)
+                .agg(*new_stat_scols)
+            )
+        else:
+            sdf = sdf.select(*groupkey_names).distinct()
+
+        internal = internal.copy(
+            spark_frame=sdf,
+            index_spark_columns=[scol_for(sdf, col) for col in groupkey_names],
+            data_spark_columns=[scol_for(sdf, col) for col in internal.data_spark_column_names],
+            data_fields=None,
+        )
+
+        return self._prepare_return(DataFrame(internal))
+
     def all(self, skipna: bool = True) -> FrameLike:
         """
         Returns True if all values in the group are truthful, else False.
@@ -805,7 +879,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
         5  False
         """
         groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))]
-        internal, sdf = self._prepare_reduce(groupkey_names)
+        internal, _, sdf = self._prepare_reduce(groupkey_names)
         psdf: DataFrame = DataFrame(internal)
 
         def sfun(scol: Column, scol_type: DataType) -> Column:
@@ -3022,7 +3096,9 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
                          `accepted_spark_types`.
         """
         groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))]
-        internal, sdf = self._prepare_reduce(groupkey_names, accepted_spark_types, bool_to_numeric)
+        internal, _, sdf = self._prepare_reduce(
+            groupkey_names, accepted_spark_types, bool_to_numeric
+        )
         psdf: DataFrame = DataFrame(internal)
 
         if len(psdf._internal.column_labels) > 0:
@@ -3072,7 +3148,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
         groupkey_names: List,
         accepted_spark_types: Optional[Tuple[Type[DataType], ...]] = None,
         bool_to_numeric: bool = False,
-    ) -> Tuple[InternalFrame, SparkDataFrame]:
+    ) -> Tuple[InternalFrame, List[Series], SparkDataFrame]:
         groupkey_scols = [s.alias(name) for s, name in zip(self._groupkeys_scols, groupkey_names)]
         agg_columns = []
         for psser in self._agg_columns:
@@ -3100,7 +3176,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
             data_fields=[psser._internal.data_fields[0] for psser in agg_columns],
             column_label_names=self._psdf._internal.column_label_names,
         )
-        return internal, sdf
+        return internal, agg_columns, sdf
 
     @staticmethod
     def _resolve_grouping_from_diff_dataframes(
diff --git a/python/pyspark/pandas/missing/groupby.py b/python/pyspark/pandas/missing/groupby.py
index 3ea443ebd6e..ce61b1df1e1 100644
--- a/python/pyspark/pandas/missing/groupby.py
+++ b/python/pyspark/pandas/missing/groupby.py
@@ -48,7 +48,6 @@ class MissingPandasLikeDataFrameGroupBy:
     groups = _unsupported_property("groups")
     hist = _unsupported_property("hist")
     indices = _unsupported_property("indices")
-    mad = _unsupported_property("mad")
     ngroups = _unsupported_property("ngroups")
     plot = _unsupported_property("plot")
     quantile = _unsupported_property("quantile")
@@ -82,7 +81,6 @@ class MissingPandasLikeSeriesGroupBy:
     indices = _unsupported_property("indices")
     is_monotonic_decreasing = _unsupported_property("is_monotonic_decreasing")
     is_monotonic_increasing = _unsupported_property("is_monotonic_increasing")
-    mad = _unsupported_property("mad")
     ngroups = _unsupported_property("ngroups")
     plot = _unsupported_property("plot")
     quantile = _unsupported_property("quantile")
diff --git a/python/pyspark/pandas/tests/test_groupby.py b/python/pyspark/pandas/tests/test_groupby.py
index 3709a696f2d..cff2ce706d8 100644
--- a/python/pyspark/pandas/tests/test_groupby.py
+++ b/python/pyspark/pandas/tests/test_groupby.py
@@ -1353,6 +1353,9 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
         self._test_stat_func(lambda groupby_obj: groupby_obj.max(numeric_only=None))
         self._test_stat_func(lambda groupby_obj: groupby_obj.max(numeric_only=True))
 
+    def test_mad(self):
+        self._test_stat_func(lambda groupby_obj: groupby_obj.mad())
+
     def test_first(self):
         self._test_stat_func(lambda groupby_obj: groupby_obj.first())
         self._test_stat_func(lambda groupby_obj: groupby_obj.first(numeric_only=None))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org