You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/09/09 00:24:54 UTC
[spark] branch master updated: [SPARK-40332][PS] Implement `GroupBy.quantile`
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6577c43852d [SPARK-40332][PS] Implement `GroupBy.quantile`
6577c43852d is described below
commit 6577c43852dca1725ab77650103b7763e6520500
Author: Yikun Jiang <yi...@gmail.com>
AuthorDate: Fri Sep 9 08:24:29 2022 +0800
[SPARK-40332][PS] Implement `GroupBy.quantile`
### What changes were proposed in this pull request?
Implement `GroupBy.quantile`
### Why are the changes needed?
Improve PS api coverage
### Does this PR introduce _any_ user-facing change?
yes, new API
```python
>>> df = ps.DataFrame([
... ['a', 1], ['a', 2], ['a', 3],
... ['b', 1], ['b', 3], ['b', 5]
... ], columns=['key', 'val'])
>>> df.groupby('key').quantile()
val
key
a 2.0
b 3.0
```
### How was this patch tested?
UT
Closes #37816 from Yikun/SPARK-40332.
Authored-by: Yikun Jiang <yi...@gmail.com>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
.../source/reference/pyspark.pandas/groupby.rst | 1 +
python/pyspark/pandas/groupby.py | 64 +++++++++++++++++++++-
python/pyspark/pandas/missing/groupby.py | 2 -
python/pyspark/pandas/tests/test_groupby.py | 42 ++++++++++++++
4 files changed, 106 insertions(+), 3 deletions(-)
diff --git a/python/docs/source/reference/pyspark.pandas/groupby.rst b/python/docs/source/reference/pyspark.pandas/groupby.rst
index 24e3bde91f5..4c29964966c 100644
--- a/python/docs/source/reference/pyspark.pandas/groupby.rst
+++ b/python/docs/source/reference/pyspark.pandas/groupby.rst
@@ -80,6 +80,7 @@ Computations / Descriptive Stats
GroupBy.sum
GroupBy.var
GroupBy.nunique
+ GroupBy.quantile
GroupBy.size
GroupBy.diff
GroupBy.idxmax
diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index 01163b61375..2e2e5540bd4 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -45,7 +45,7 @@ from typing import (
import warnings
import pandas as pd
-from pandas.api.types import is_hashable, is_list_like # type: ignore[attr-defined]
+from pandas.api.types import is_number, is_hashable, is_list_like # type: ignore[attr-defined]
if LooseVersion(pd.__version__) >= LooseVersion("1.3.0"):
from pandas.core.common import _builtin_table # type: ignore[attr-defined]
@@ -58,6 +58,7 @@ from pyspark.sql import Column, DataFrame as SparkDataFrame, Window, functions a
from pyspark.sql.types import (
BooleanType,
DataType,
+ DoubleType,
NumericType,
StructField,
StructType,
@@ -581,6 +582,67 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
F.mean, accepted_spark_types=(NumericType,), bool_to_numeric=True
)
+ # TODO: 'q' accepts list like type
+ def quantile(self, q: float = 0.5, accuracy: int = 10000) -> FrameLike:
+ """
+ Return group values at the given quantile.
+
+ .. versionadded:: 3.4.0
+
+ Parameters
+ ----------
+ q : float, default 0.5 (50% quantile)
+ Value between 0 and 1 providing the quantile to compute.
+ accuracy : int, optional
+ Default accuracy of approximation. Larger value means better accuracy.
+ The relative error can be deduced by 1.0 / accuracy.
+ This is a panda-on-Spark specific parameter.
+
+ Returns
+ -------
+ pyspark.pandas.Series or pyspark.pandas.DataFrame
+ Return type determined by caller of GroupBy object.
+
+ Notes
+ -------
+ `quantile` in pandas-on-Spark are using distributed percentile approximation
+ algorithm unlike pandas, the result might different with pandas, also
+ `interpolation` parameters are not supported yet.
+
+ See Also
+ --------
+ pyspark.pandas.Series.quantile
+ pyspark.pandas.DataFrame.quantile
+ pyspark.sql.functions.percentile_approx
+
+ Examples
+ --------
+ >>> df = ps.DataFrame([
+ ... ['a', 1], ['a', 2], ['a', 3],
+ ... ['b', 1], ['b', 3], ['b', 5]
+ ... ], columns=['key', 'val'])
+
+ Groupby one column and return the quantile of the remaining columns in
+ each group.
+
+ >>> df.groupby('key').quantile()
+ val
+ key
+ a 2.0
+ b 3.0
+ """
+ if is_list_like(q):
+ raise NotImplementedError("q doesn't support for list like type for now")
+ if not is_number(q):
+ raise TypeError("must be real number, not %s" % type(q).__name__)
+ if not 0 <= q <= 1:
+ raise ValueError("'q' must be between 0 and 1. Got '%s' instead" % q)
+ return self._reduce_for_stat_function(
+ lambda col: F.percentile_approx(col.cast(DoubleType()), q, accuracy),
+ accepted_spark_types=(NumericType, BooleanType),
+ bool_to_numeric=True,
+ )
+
def min(self, numeric_only: Optional[bool] = False) -> FrameLike:
"""
Compute min of group values.
diff --git a/python/pyspark/pandas/missing/groupby.py b/python/pyspark/pandas/missing/groupby.py
index e913835ca72..0694ae62a20 100644
--- a/python/pyspark/pandas/missing/groupby.py
+++ b/python/pyspark/pandas/missing/groupby.py
@@ -50,7 +50,6 @@ class MissingPandasLikeDataFrameGroupBy:
indices = _unsupported_property("indices")
ngroups = _unsupported_property("ngroups")
plot = _unsupported_property("plot")
- quantile = _unsupported_property("quantile")
tshift = _unsupported_property("tshift")
# Deprecated properties
@@ -81,7 +80,6 @@ class MissingPandasLikeSeriesGroupBy:
is_monotonic_increasing = _unsupported_property("is_monotonic_increasing")
ngroups = _unsupported_property("ngroups")
plot = _unsupported_property("plot")
- quantile = _unsupported_property("quantile")
tshift = _unsupported_property("tshift")
# Deprecated properties
diff --git a/python/pyspark/pandas/tests/test_groupby.py b/python/pyspark/pandas/tests/test_groupby.py
index 1076d867344..6e4aa6186c6 100644
--- a/python/pyspark/pandas/tests/test_groupby.py
+++ b/python/pyspark/pandas/tests/test_groupby.py
@@ -1357,6 +1357,48 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
with self.assertRaises(TypeError):
psdf.groupby("A")["C"].mean()
+ def test_quantile(self):
+ dfs = [
+ pd.DataFrame(
+ [["a", 1], ["a", 2], ["a", 3], ["b", 1], ["b", 3], ["b", 5]], columns=["key", "val"]
+ ),
+ pd.DataFrame(
+ [["a", True], ["a", True], ["a", False], ["b", True], ["b", True], ["b", False]],
+ columns=["key", "val"],
+ ),
+ ]
+ for df in dfs:
+ psdf = ps.from_pandas(df)
+ # q accept float and int between 0 and 1
+ for i in [0, 0.1, 0.5, 1]:
+ self.assert_eq(
+ df.groupby("key").quantile(q=i, interpolation="lower"),
+ psdf.groupby("key").quantile(q=i),
+ almost=True,
+ )
+ self.assert_eq(
+ df.groupby("key")["val"].quantile(q=i, interpolation="lower"),
+ psdf.groupby("key")["val"].quantile(q=i),
+ almost=True,
+ )
+ # raise ValueError when q not in [0, 1]
+ with self.assertRaises(ValueError):
+ psdf.groupby("key").quantile(q=1.1)
+ with self.assertRaises(ValueError):
+ psdf.groupby("key").quantile(q=-0.1)
+ with self.assertRaises(ValueError):
+ psdf.groupby("key").quantile(q=2)
+ with self.assertRaises(ValueError):
+ psdf.groupby("key").quantile(q=np.nan)
+ # raise TypeError when q type mismatch
+ with self.assertRaises(TypeError):
+ psdf.groupby("key").quantile(q="0.1")
+ # raise NotImplementedError when q is list like type
+ with self.assertRaises(NotImplementedError):
+ psdf.groupby("key").quantile(q=(0.1, 0.5))
+ with self.assertRaises(NotImplementedError):
+ psdf.groupby("key").quantile(q=[0.1, 0.5])
+
def test_min(self):
self._test_stat_func(lambda groupby_obj: groupby_obj.min())
self._test_stat_func(lambda groupby_obj: groupby_obj.min(numeric_only=None))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org