You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/06/05 01:49:58 UTC
[spark] branch master updated: [SPARK-39284][PS] Implement Groupby.mad
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d793c5c6858 [SPARK-39284][PS] Implement Groupby.mad
d793c5c6858 is described below
commit d793c5c6858cb3d89fd981495a85f4c60ae63035
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Sun Jun 5 09:49:24 2022 +0800
[SPARK-39284][PS] Implement Groupby.mad
### What changes were proposed in this pull request?
Implement Groupby.mad
### Why are the changes needed?
to increase pandas api coverage
### Does this PR introduce _any_ user-facing change?
yes
```
In [6]: pdf = pd.DataFrame({"A": [1, 2, 2, 1, 1], "B": [3, 2, 3, 9, 0], "C": [3, 4, 13, -14, 9]})
In [7]: psdf = ps.from_pandas(pdf)
In [8]: pdf.groupby("A")[["B", "C"]].mad()
Out[8]:
B C
A
1 3.333333 8.888889
2 0.500000 4.500000
In [9]: psdf.groupby("A")[["B", "C"]].mad()
Out[9]:
B C
A
1 3.333333 8.888889
2 0.500000 4.500000
In [10]: pdf.B.groupby(pdf.A).mad()
Out[10]:
A
1 3.333333
2 0.500000
Name: B, dtype: float64
In [11]: psdf.B.groupby(psdf.A).mad()
Out[11]:
A
1 3.333333
2 0.500000
Name: B, dtype: float64
```
### How was this patch tested?
added ut
Closes #36660 from zhengruifeng/ps_groupby_mad.
Lead-authored-by: Ruifeng Zheng <ru...@apache.org>
Co-authored-by: Ruifeng Zheng <ru...@foxmail.com>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
python/pyspark/pandas/groupby.py | 84 +++++++++++++++++++++++++++--
python/pyspark/pandas/missing/groupby.py | 2 -
python/pyspark/pandas/tests/test_groupby.py | 3 ++
3 files changed, 83 insertions(+), 6 deletions(-)
diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index ce8a322c20b..4377ad6a5c9 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -753,6 +753,80 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
bool_to_numeric=True,
)
+ # TODO: 'axis', 'skipna', 'level' parameter should be implemented.
+ def mad(self) -> FrameLike:
+ """
+ Compute mean absolute deviation of groups, excluding missing values.
+
+ .. versionadded:: 3.4.0
+
+ Examples
+ --------
+ >>> df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, True],
+ ... "C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]})
+
+ >>> df.groupby("A").mad()
+ B C
+ A
+ 1 0.444444 0.444444
+ 2 0.000000 0.000000
+
+ >>> df.B.groupby(df.A).mad()
+ A
+ 1 0.444444
+ 2 0.000000
+ Name: B, dtype: float64
+
+ See Also
+ --------
+ pyspark.pandas.Series.groupby
+ pyspark.pandas.DataFrame.groupby
+ """
+ groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))]
+ internal, agg_columns, sdf = self._prepare_reduce(
+ groupkey_names=groupkey_names,
+ accepted_spark_types=(NumericType, BooleanType),
+ bool_to_numeric=False,
+ )
+ psdf: DataFrame = DataFrame(internal)
+
+ if len(psdf._internal.column_labels) > 0:
+ window = Window.partitionBy(groupkey_names).rowsBetween(
+ Window.unboundedPreceding, Window.unboundedFollowing
+ )
+ new_agg_scols = {}
+ new_stat_scols = []
+ for agg_column in agg_columns:
+ # it is not able to directly use 'self._reduce_for_stat_function', due to
+ # 'it is not allowed to use a window function inside an aggregate function'.
+ # so we need to create temporary columns to compute the 'abs(x - avg(x))' here.
+ agg_column_name = agg_column._internal.data_spark_column_names[0]
+ new_agg_column_name = verify_temp_column_name(
+ psdf._internal.spark_frame, "__tmp_agg_col_{}__".format(agg_column_name)
+ )
+ casted_agg_scol = F.col(agg_column_name).cast("double")
+ new_agg_scols[new_agg_column_name] = F.abs(
+ casted_agg_scol - F.avg(casted_agg_scol).over(window)
+ )
+ new_stat_scols.append(F.avg(F.col(new_agg_column_name)).alias(agg_column_name))
+
+ sdf = (
+ psdf._internal.spark_frame.withColumns(new_agg_scols)
+ .groupby(groupkey_names)
+ .agg(*new_stat_scols)
+ )
+ else:
+ sdf = sdf.select(*groupkey_names).distinct()
+
+ internal = internal.copy(
+ spark_frame=sdf,
+ index_spark_columns=[scol_for(sdf, col) for col in groupkey_names],
+ data_spark_columns=[scol_for(sdf, col) for col in internal.data_spark_column_names],
+ data_fields=None,
+ )
+
+ return self._prepare_return(DataFrame(internal))
+
def all(self, skipna: bool = True) -> FrameLike:
"""
Returns True if all values in the group are truthful, else False.
@@ -805,7 +879,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
5 False
"""
groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))]
- internal, sdf = self._prepare_reduce(groupkey_names)
+ internal, _, sdf = self._prepare_reduce(groupkey_names)
psdf: DataFrame = DataFrame(internal)
def sfun(scol: Column, scol_type: DataType) -> Column:
@@ -3022,7 +3096,9 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
`accepted_spark_types`.
"""
groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))]
- internal, sdf = self._prepare_reduce(groupkey_names, accepted_spark_types, bool_to_numeric)
+ internal, _, sdf = self._prepare_reduce(
+ groupkey_names, accepted_spark_types, bool_to_numeric
+ )
psdf: DataFrame = DataFrame(internal)
if len(psdf._internal.column_labels) > 0:
@@ -3072,7 +3148,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
groupkey_names: List,
accepted_spark_types: Optional[Tuple[Type[DataType], ...]] = None,
bool_to_numeric: bool = False,
- ) -> Tuple[InternalFrame, SparkDataFrame]:
+ ) -> Tuple[InternalFrame, List[Series], SparkDataFrame]:
groupkey_scols = [s.alias(name) for s, name in zip(self._groupkeys_scols, groupkey_names)]
agg_columns = []
for psser in self._agg_columns:
@@ -3100,7 +3176,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
data_fields=[psser._internal.data_fields[0] for psser in agg_columns],
column_label_names=self._psdf._internal.column_label_names,
)
- return internal, sdf
+ return internal, agg_columns, sdf
@staticmethod
def _resolve_grouping_from_diff_dataframes(
diff --git a/python/pyspark/pandas/missing/groupby.py b/python/pyspark/pandas/missing/groupby.py
index 3ea443ebd6e..ce61b1df1e1 100644
--- a/python/pyspark/pandas/missing/groupby.py
+++ b/python/pyspark/pandas/missing/groupby.py
@@ -48,7 +48,6 @@ class MissingPandasLikeDataFrameGroupBy:
groups = _unsupported_property("groups")
hist = _unsupported_property("hist")
indices = _unsupported_property("indices")
- mad = _unsupported_property("mad")
ngroups = _unsupported_property("ngroups")
plot = _unsupported_property("plot")
quantile = _unsupported_property("quantile")
@@ -82,7 +81,6 @@ class MissingPandasLikeSeriesGroupBy:
indices = _unsupported_property("indices")
is_monotonic_decreasing = _unsupported_property("is_monotonic_decreasing")
is_monotonic_increasing = _unsupported_property("is_monotonic_increasing")
- mad = _unsupported_property("mad")
ngroups = _unsupported_property("ngroups")
plot = _unsupported_property("plot")
quantile = _unsupported_property("quantile")
diff --git a/python/pyspark/pandas/tests/test_groupby.py b/python/pyspark/pandas/tests/test_groupby.py
index 3709a696f2d..cff2ce706d8 100644
--- a/python/pyspark/pandas/tests/test_groupby.py
+++ b/python/pyspark/pandas/tests/test_groupby.py
@@ -1353,6 +1353,9 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
self._test_stat_func(lambda groupby_obj: groupby_obj.max(numeric_only=None))
self._test_stat_func(lambda groupby_obj: groupby_obj.max(numeric_only=True))
+ def test_mad(self):
+ self._test_stat_func(lambda groupby_obj: groupby_obj.mad())
+
def test_first(self):
self._test_stat_func(lambda groupby_obj: groupby_obj.first())
self._test_stat_func(lambda groupby_obj: groupby_obj.first(numeric_only=None))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org