You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/09/09 00:24:54 UTC

[spark] branch master updated: [SPARK-40332][PS] Implement `GroupBy.quantile`

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6577c43852d [SPARK-40332][PS] Implement `GroupBy.quantile`
6577c43852d is described below

commit 6577c43852dca1725ab77650103b7763e6520500
Author: Yikun Jiang <yi...@gmail.com>
AuthorDate: Fri Sep 9 08:24:29 2022 +0800

    [SPARK-40332][PS] Implement `GroupBy.quantile`
    
    ### What changes were proposed in this pull request?
    Implement `GroupBy.quantile`
    
    ### Why are the changes needed?
    Improve PS api coverage
    
    ### Does this PR introduce _any_ user-facing change?
    yes, new API
    ```python
    >>> df = ps.DataFrame([
    ...     ['a', 1], ['a', 2], ['a', 3],
    ...     ['b', 1], ['b', 3], ['b', 5]
    ... ], columns=['key', 'val'])
    >>> df.groupby('key').quantile()
         val
    key
    a    2.0
    b    3.0
    ```
    
    ### How was this patch tested?
    UT
    
    Closes #37816 from Yikun/SPARK-40332.
    
    Authored-by: Yikun Jiang <yi...@gmail.com>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 .../source/reference/pyspark.pandas/groupby.rst    |  1 +
 python/pyspark/pandas/groupby.py                   | 64 +++++++++++++++++++++-
 python/pyspark/pandas/missing/groupby.py           |  2 -
 python/pyspark/pandas/tests/test_groupby.py        | 42 ++++++++++++++
 4 files changed, 106 insertions(+), 3 deletions(-)

diff --git a/python/docs/source/reference/pyspark.pandas/groupby.rst b/python/docs/source/reference/pyspark.pandas/groupby.rst
index 24e3bde91f5..4c29964966c 100644
--- a/python/docs/source/reference/pyspark.pandas/groupby.rst
+++ b/python/docs/source/reference/pyspark.pandas/groupby.rst
@@ -80,6 +80,7 @@ Computations / Descriptive Stats
    GroupBy.sum
    GroupBy.var
    GroupBy.nunique
+   GroupBy.quantile
    GroupBy.size
    GroupBy.diff
    GroupBy.idxmax
diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py
index 01163b61375..2e2e5540bd4 100644
--- a/python/pyspark/pandas/groupby.py
+++ b/python/pyspark/pandas/groupby.py
@@ -45,7 +45,7 @@ from typing import (
 import warnings
 
 import pandas as pd
-from pandas.api.types import is_hashable, is_list_like  # type: ignore[attr-defined]
+from pandas.api.types import is_number, is_hashable, is_list_like  # type: ignore[attr-defined]
 
 if LooseVersion(pd.__version__) >= LooseVersion("1.3.0"):
     from pandas.core.common import _builtin_table  # type: ignore[attr-defined]
@@ -58,6 +58,7 @@ from pyspark.sql import Column, DataFrame as SparkDataFrame, Window, functions a
 from pyspark.sql.types import (
     BooleanType,
     DataType,
+    DoubleType,
     NumericType,
     StructField,
     StructType,
@@ -581,6 +582,67 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta):
             F.mean, accepted_spark_types=(NumericType,), bool_to_numeric=True
         )
 
+    # TODO: 'q' accepts list like type
+    def quantile(self, q: float = 0.5, accuracy: int = 10000) -> FrameLike:
+        """
+        Return group values at the given quantile.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        q : float, default 0.5 (50% quantile)
+            Value between 0 and 1 providing the quantile to compute.
+        accuracy : int, optional
+            Default accuracy of approximation. Larger value means better accuracy.
+            The relative error can be deduced by 1.0 / accuracy.
+            This is a panda-on-Spark specific parameter.
+
+        Returns
+        -------
+        pyspark.pandas.Series or pyspark.pandas.DataFrame
+            Return type determined by caller of GroupBy object.
+
+        Notes
+        -------
+        `quantile` in pandas-on-Spark are using distributed percentile approximation
+        algorithm unlike pandas, the result might different with pandas, also
+        `interpolation` parameters are not supported yet.
+
+        See Also
+        --------
+        pyspark.pandas.Series.quantile
+        pyspark.pandas.DataFrame.quantile
+        pyspark.sql.functions.percentile_approx
+
+        Examples
+        --------
+        >>> df = ps.DataFrame([
+        ...     ['a', 1], ['a', 2], ['a', 3],
+        ...     ['b', 1], ['b', 3], ['b', 5]
+        ... ], columns=['key', 'val'])
+
+        Groupby one column and return the quantile of the remaining columns in
+        each group.
+
+        >>> df.groupby('key').quantile()
+             val
+        key
+        a    2.0
+        b    3.0
+        """
+        if is_list_like(q):
+            raise NotImplementedError("q doesn't support for list like type for now")
+        if not is_number(q):
+            raise TypeError("must be real number, not %s" % type(q).__name__)
+        if not 0 <= q <= 1:
+            raise ValueError("'q' must be between 0 and 1. Got '%s' instead" % q)
+        return self._reduce_for_stat_function(
+            lambda col: F.percentile_approx(col.cast(DoubleType()), q, accuracy),
+            accepted_spark_types=(NumericType, BooleanType),
+            bool_to_numeric=True,
+        )
+
     def min(self, numeric_only: Optional[bool] = False) -> FrameLike:
         """
         Compute min of group values.
diff --git a/python/pyspark/pandas/missing/groupby.py b/python/pyspark/pandas/missing/groupby.py
index e913835ca72..0694ae62a20 100644
--- a/python/pyspark/pandas/missing/groupby.py
+++ b/python/pyspark/pandas/missing/groupby.py
@@ -50,7 +50,6 @@ class MissingPandasLikeDataFrameGroupBy:
     indices = _unsupported_property("indices")
     ngroups = _unsupported_property("ngroups")
     plot = _unsupported_property("plot")
-    quantile = _unsupported_property("quantile")
     tshift = _unsupported_property("tshift")
 
     # Deprecated properties
@@ -81,7 +80,6 @@ class MissingPandasLikeSeriesGroupBy:
     is_monotonic_increasing = _unsupported_property("is_monotonic_increasing")
     ngroups = _unsupported_property("ngroups")
     plot = _unsupported_property("plot")
-    quantile = _unsupported_property("quantile")
     tshift = _unsupported_property("tshift")
 
     # Deprecated properties
diff --git a/python/pyspark/pandas/tests/test_groupby.py b/python/pyspark/pandas/tests/test_groupby.py
index 1076d867344..6e4aa6186c6 100644
--- a/python/pyspark/pandas/tests/test_groupby.py
+++ b/python/pyspark/pandas/tests/test_groupby.py
@@ -1357,6 +1357,48 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
         with self.assertRaises(TypeError):
             psdf.groupby("A")["C"].mean()
 
+    def test_quantile(self):
+        dfs = [
+            pd.DataFrame(
+                [["a", 1], ["a", 2], ["a", 3], ["b", 1], ["b", 3], ["b", 5]], columns=["key", "val"]
+            ),
+            pd.DataFrame(
+                [["a", True], ["a", True], ["a", False], ["b", True], ["b", True], ["b", False]],
+                columns=["key", "val"],
+            ),
+        ]
+        for df in dfs:
+            psdf = ps.from_pandas(df)
+            # q accept float and int between 0 and 1
+            for i in [0, 0.1, 0.5, 1]:
+                self.assert_eq(
+                    df.groupby("key").quantile(q=i, interpolation="lower"),
+                    psdf.groupby("key").quantile(q=i),
+                    almost=True,
+                )
+                self.assert_eq(
+                    df.groupby("key")["val"].quantile(q=i, interpolation="lower"),
+                    psdf.groupby("key")["val"].quantile(q=i),
+                    almost=True,
+                )
+            # raise ValueError when q not in [0, 1]
+            with self.assertRaises(ValueError):
+                psdf.groupby("key").quantile(q=1.1)
+            with self.assertRaises(ValueError):
+                psdf.groupby("key").quantile(q=-0.1)
+            with self.assertRaises(ValueError):
+                psdf.groupby("key").quantile(q=2)
+            with self.assertRaises(ValueError):
+                psdf.groupby("key").quantile(q=np.nan)
+            # raise TypeError when q type mismatch
+            with self.assertRaises(TypeError):
+                psdf.groupby("key").quantile(q="0.1")
+            # raise NotImplementedError when q is list like type
+            with self.assertRaises(NotImplementedError):
+                psdf.groupby("key").quantile(q=(0.1, 0.5))
+            with self.assertRaises(NotImplementedError):
+                psdf.groupby("key").quantile(q=[0.1, 0.5])
+
     def test_min(self):
         self._test_stat_func(lambda groupby_obj: groupby_obj.min())
         self._test_stat_func(lambda groupby_obj: groupby_obj.min(numeric_only=None))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org