You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/04/13 09:09:23 UTC

[spark] branch master updated: [SPARK-38774][PYTHON] Implement Series.autocorr

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new eb699ec138d [SPARK-38774][PYTHON] Implement Series.autocorr
eb699ec138d is described below

commit eb699ec138d4a49ecc204f530eeefa513b42f4ad
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Wed Apr 13 18:09:06 2022 +0900

    [SPARK-38774][PYTHON] Implement Series.autocorr
    
    ### What changes were proposed in this pull request?
    Implement Series.autocorr
    
    ### Why are the changes needed?
    for API coverage
    
    ### Does this PR introduce _any_ user-facing change?
    yes, Series now support function `autocorr`
    
    ```
    In [86]: s = pd.Series([.2, .0, .6, .2, np.nan, .5, .6])
    
    In [87]: s.autocorr()
    Out[87]: -0.14121975762272054
    ```
    
    ### How was this patch tested?
    added doctest
    
    Closes #36048 from zhengruifeng/pandas_series_autocorr.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../source/reference/pyspark.pandas/series.rst     |  1 +
 python/pyspark/pandas/missing/series.py            |  1 -
 python/pyspark/pandas/series.py                    | 76 ++++++++++++++++++++++
 python/pyspark/pandas/tests/test_series.py         | 17 +++++
 4 files changed, 94 insertions(+), 1 deletion(-)

diff --git a/python/docs/source/reference/pyspark.pandas/series.rst b/python/docs/source/reference/pyspark.pandas/series.rst
index b6a0d1e52d5..0f897ce2e14 100644
--- a/python/docs/source/reference/pyspark.pandas/series.rst
+++ b/python/docs/source/reference/pyspark.pandas/series.rst
@@ -134,6 +134,7 @@ Computations / Descriptive Stats
    Series.abs
    Series.all
    Series.any
+   Series.autocorr
    Series.between
    Series.clip
    Series.corr
diff --git a/python/pyspark/pandas/missing/series.py b/python/pyspark/pandas/missing/series.py
index 9bb191f1c81..07094b64bbb 100644
--- a/python/pyspark/pandas/missing/series.py
+++ b/python/pyspark/pandas/missing/series.py
@@ -33,7 +33,6 @@ class MissingPandasLikeSeries:
 
     # Functions
     asfreq = _unsupported_function("asfreq")
-    autocorr = _unsupported_function("autocorr")
     combine = _unsupported_function("combine")
     convert_dtypes = _unsupported_function("convert_dtypes")
     infer_objects = _unsupported_function("infer_objects")
diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py
index ced81b12e8c..d6cc4a8627d 100644
--- a/python/pyspark/pandas/series.py
+++ b/python/pyspark/pandas/series.py
@@ -3045,6 +3045,82 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
             DataFrame(internal.with_new_sdf(sdf, index_fields=([None] * internal.index_level)))
         )
 
+    def autocorr(self, periods: int = 1) -> float:
+        """
+        Compute the lag-N autocorrelation.
+
+        This method computes the Pearson correlation between
+        the Series and its shifted self.
+
+        .. note:: the current implementation of rank uses Spark's Window without
+            specifying partition specification. This leads to move all data into
+            single partition in single machine and could cause serious
+            performance degradation. Avoid this method against very large dataset.
+
+        .. versionadded:: 3.4.0
+
+        Parameters
+        ----------
+        periods : int, default 1
+            Number of lags to apply before performing autocorrelation.
+
+        Returns
+        -------
+        float
+            The Pearson correlation between self and self.shift(lag).
+
+        See Also
+        --------
+        Series.corr : Compute the correlation between two Series.
+        Series.shift : Shift index by desired number of periods.
+        DataFrame.corr : Compute pairwise correlation of columns.
+
+        Notes
+        -----
+        If the Pearson correlation is not well defined return 'NaN'.
+
+        Examples
+        --------
+        >>> s = ps.Series([.2, .0, .6, .2, np.nan, .5, .6])
+        >>> s.autocorr()  # doctest: +ELLIPSIS
+        -0.141219...
+        >>> s.autocorr(0)  # doctest: +ELLIPSIS
+        1.0...
+        >>> s.autocorr(2)  # doctest: +ELLIPSIS
+        0.970725...
+        >>> s.autocorr(-3)  # doctest: +ELLIPSIS
+        0.277350...
+        >>> s.autocorr(5)  # doctest: +ELLIPSIS
+        -1.000000...
+        >>> s.autocorr(6)  # doctest: +ELLIPSIS
+        nan
+
+        If the Pearson correlation is not well defined, then 'NaN' is returned.
+
+        >>> s = ps.Series([1, 0, 0, 0])
+        >>> s.autocorr()
+        nan
+        """
+        # This implementation is suboptimal because it moves all data to a single partition,
+        # global sort should be used instead of window, but it should be a start
+        if not isinstance(periods, int):
+            raise TypeError("periods should be an int; however, got [%s]" % type(periods).__name__)
+
+        tmp_col = "__tmp_col__"
+        tmp_lag_col = "__tmp_lag_col__"
+        scol = self.spark.column.alias(tmp_col)
+        if periods == 0:
+            lag_col = scol.alias(tmp_lag_col)
+        else:
+            window = Window.orderBy(NATURAL_ORDER_COLUMN_NAME)
+            lag_col = F.lag(scol, periods).over(window).alias(tmp_lag_col)
+
+        return (
+            self._internal.spark_frame.select([scol, lag_col])
+            .dropna("any")
+            .corr(tmp_col, tmp_lag_col)
+        )
+
     def corr(self, other: "Series", method: str = "pearson") -> float:
         """
         Compute correlation with `other` Series, excluding missing values.
diff --git a/python/pyspark/pandas/tests/test_series.py b/python/pyspark/pandas/tests/test_series.py
index 0fac8ac6515..76d35c51196 100644
--- a/python/pyspark/pandas/tests/test_series.py
+++ b/python/pyspark/pandas/tests/test_series.py
@@ -3104,6 +3104,23 @@ class SeriesTest(PandasOnSparkTestCase, SQLTestUtils):
 
         self.assert_eq(psser1.combine_first(psser2), pser1.combine_first(pser2))
 
+    def test_autocorr(self):
+        pdf = pd.DataFrame({"s1": [0.90010907, 0.13484424, 0.62036035]})
+        self._test_autocorr(pdf)
+
+        pdf = pd.DataFrame({"s1": [0.90010907, np.nan, 0.13484424, 0.62036035]})
+        self._test_autocorr(pdf)
+
+        pdf = pd.DataFrame({"s1": [0.2, 0.0, 0.6, 0.2, np.nan, 0.5, 0.6]})
+        self._test_autocorr(pdf)
+
+    def _test_autocorr(self, pdf):
+        psdf = ps.from_pandas(pdf)
+        for lag in range(-10, 10):
+            p_autocorr = pdf["s1"].autocorr(lag)
+            ps_autocorr = psdf["s1"].autocorr(lag)
+            self.assert_eq(p_autocorr, ps_autocorr, almost=True)
+
     def test_cov(self):
         pdf = pd.DataFrame(
             {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org