You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/09/13 03:44:18 UTC

[spark] branch master updated: [SPARK-40386][PS][SQL] Implement `ddof` in `DataFrame.cov`

This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f92b4941c63 [SPARK-40386][PS][SQL] Implement `ddof` in `DataFrame.cov`
f92b4941c63 is described below

commit f92b4941c631526a387c6f23554db53fbf922b96
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Tue Sep 13 11:43:47 2022 +0800

    [SPARK-40386][PS][SQL] Implement `ddof` in `DataFrame.cov`
    
    ### What changes were proposed in this pull request?
    1, add a dedicated expression for `DataFrame.cov`;
    2, add missing parameter `ddof` in `DataFrame.cov`
    
    ### Why are the changes needed?
    for api coverage
    
    ### Does this PR introduce _any_ user-facing change?
    yes, API change
    
    ```
            >>> np.random.seed(42)
            >>> df = ps.DataFrame(np.random.randn(1000, 5),
            ...                   columns=['a', 'b', 'c', 'd', 'e'])
            >>> df.cov()
                      a         b         c         d         e
            a  0.998438 -0.020161  0.059277 -0.008943  0.014144
            b -0.020161  1.059352 -0.008543 -0.024738  0.009826
            c  0.059277 -0.008543  1.010670 -0.001486 -0.000271
            d -0.008943 -0.024738 -0.001486  0.921297 -0.013692
            e  0.014144  0.009826 -0.000271 -0.013692  0.977795
            >>> df.cov(ddof=2)
                      a         b         c         d         e
            a  0.999439 -0.020181  0.059336 -0.008952  0.014159
            b -0.020181  1.060413 -0.008551 -0.024762  0.009836
            c  0.059336 -0.008551  1.011683 -0.001487 -0.000271
            d -0.008952 -0.024762 -0.001487  0.922220 -0.013705
            e  0.014159  0.009836 -0.000271 -0.013705  0.978775
            >>> df.cov(ddof=-1)
              a         b         c         d         e
            a  0.996444 -0.020121  0.059158 -0.008926  0.014116
            b -0.020121  1.057235 -0.008526 -0.024688  0.009807
            c  0.059158 -0.008526  1.008650 -0.001483 -0.000270
            d -0.008926 -0.024688 -0.001483  0.919456 -0.013664
            e  0.014116  0.009807 -0.000270 -0.013664  0.975842
    ```
    
    ### How was this patch tested?
    added tests
    
    Closes #37829 from zhengruifeng/ps_cov_ddof.
    
    Authored-by: Ruifeng Zheng <ru...@apache.org>
    Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
 python/pyspark/pandas/frame.py                     | 31 +++++++++++++++++-----
 python/pyspark/pandas/spark/functions.py           |  5 ++++
 python/pyspark/pandas/tests/test_dataframe.py      | 10 +++++++
 .../expressions/aggregate/Covariance.scala         | 22 +++++++++++++++
 .../spark/sql/api/python/PythonSQLUtils.scala      |  4 +++
 5 files changed, 66 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 2a7fda2d527..3438d07896e 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -8738,8 +8738,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         internal = self._internal.with_new_sdf(sdf, data_fields=data_fields)
         self._update_internal_frame(internal, check_same_anchor=False)
 
-    # TODO: ddof should be implemented.
-    def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
+    def cov(self, min_periods: Optional[int] = None, ddof: int = 1) -> "DataFrame":
         """
         Compute pairwise covariance of columns, excluding NA/null values.
 
@@ -8755,8 +8754,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         below this threshold will be returned as ``NaN``.
 
         This method is generally used for the analysis of time series data to
-        understand the relationship between different measures
-        across time.
+        understand the relationship between different measures across time.
 
         .. versionadded:: 3.3.0
 
@@ -8765,6 +8763,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         min_periods : int, optional
             Minimum number of observations required per pair of columns
             to have a valid result.
+        ddof : int, default 1
+            Delta degrees of freedom.  The divisor used in calculations
+            is ``N - ddof``, where ``N`` represents the number of elements.
+
+            .. versionadded:: 3.4.0
 
         Returns
         -------
@@ -8794,6 +8797,20 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         c  0.059277 -0.008543  1.010670 -0.001486 -0.000271
         d -0.008943 -0.024738 -0.001486  0.921297 -0.013692
         e  0.014144  0.009826 -0.000271 -0.013692  0.977795
+        >>> df.cov(ddof=2)
+                  a         b         c         d         e
+        a  0.999439 -0.020181  0.059336 -0.008952  0.014159
+        b -0.020181  1.060413 -0.008551 -0.024762  0.009836
+        c  0.059336 -0.008551  1.011683 -0.001487 -0.000271
+        d -0.008952 -0.024762 -0.001487  0.922220 -0.013705
+        e  0.014159  0.009836 -0.000271 -0.013705  0.978775
+        >>> df.cov(ddof=-1)
+          a         b         c         d         e
+        a  0.996444 -0.020121  0.059158 -0.008926  0.014116
+        b -0.020121  1.057235 -0.008526 -0.024688  0.009807
+        c  0.059158 -0.008526  1.008650 -0.001483 -0.000270
+        d -0.008926 -0.024688 -0.001483  0.919456 -0.013664
+        e  0.014116  0.009807 -0.000270 -0.013664  0.975842
 
         **Minimum number of periods**
 
@@ -8813,6 +8830,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
         b       NaN  1.248003  0.191417
         c -0.150812  0.191417  0.895202
         """
+        if not isinstance(ddof, int):
+            raise TypeError("ddof must be integer")
         min_periods = 1 if min_periods is None else min_periods
 
         # Only compute covariance for Boolean and Numeric except Decimal
@@ -8891,8 +8910,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
             step += r
             for c in range(r, num_cols):
                 cov_scols.append(
-                    F.covar_samp(
-                        F.col(data_cols[r]).cast("double"), F.col(data_cols[c]).cast("double")
+                    SF.covar(
+                        F.col(data_cols[r]).cast("double"), F.col(data_cols[c]).cast("double"), ddof
                     )
                     if count_not_null[r * num_cols + c - step] >= min_periods
                     else F.lit(None)
diff --git a/python/pyspark/pandas/spark/functions.py b/python/pyspark/pandas/spark/functions.py
index 58715b5f781..79d73dcd7ea 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -51,6 +51,11 @@ def mode(col: Column, dropna: bool) -> Column:
     return Column(sc._jvm.PythonSQLUtils.pandasMode(col._jc, dropna))
 
 
+def covar(col1: Column, col2: Column, ddof: int) -> Column:
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.PythonSQLUtils.pandasCovar(col1._jc, col2._jc, ddof))
+
+
 def repeat(col: Column, n: Union[int, Column]) -> Column:
     """
     Repeats a string column n times, and returns it as a new string column.
diff --git a/python/pyspark/pandas/tests/test_dataframe.py b/python/pyspark/pandas/tests/test_dataframe.py
index 464cf09b45f..f491da72fd5 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -6958,6 +6958,16 @@ class DataFrameTest(ComparisonTestBase, SQLTestUtils):
         self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), almost=True)
         self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
 
+        # ddof
+        with self.assertRaisesRegex(TypeError, "ddof must be integer"):
+            psdf.cov(ddof="ddof")
+        for ddof in [-1, 0, 2]:
+            self.assert_eq(pdf.cov(ddof=ddof), psdf.cov(ddof=ddof), almost=True)
+            self.assert_eq(
+                pdf.cov(min_periods=4, ddof=ddof), psdf.cov(min_periods=4, ddof=ddof), almost=True
+            )
+            self.assert_eq(pdf.cov(min_periods=5, ddof=ddof), psdf.cov(min_periods=5, ddof=ddof))
+
         # bool
         pdf = pd.DataFrame(
             {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala
index 7a856a05b6f..ff31fb1128b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala
@@ -143,3 +143,25 @@ case class CovSample(
   override protected def withNewChildrenInternal(
       newLeft: Expression, newRight: Expression): CovSample = copy(left = newLeft, right = newRight)
 }
+
+/**
+ * Covariance in Pandas' fashion. This expression is dedicated only for Pandas API on Spark.
+ * Refer to numpy.cov.
+ */
+case class PandasCovar(
+    override val left: Expression,
+    override val right: Expression,
+    ddof: Int)
+  extends Covariance(left, right, true) {
+
+  override val evaluateExpression: Expression = {
+    If(n === 0.0, Literal.create(null, DoubleType),
+      If(n === ddof, divideByZeroEvalResult, ck / (n - ddof)))
+  }
+  override def prettyName: String = "pandas_covar"
+
+  override protected def withNewChildrenInternal(
+      newLeft: Expression,
+      newRight: Expression): PandasCovar =
+    copy(left = newLeft, right = newRight)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index 538356cd8c8..2b74bcc3850 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -138,6 +138,10 @@ private[sql] object PythonSQLUtils extends Logging {
   def pandasMode(e: Column, ignoreNA: Boolean): Column = {
     Column(PandasMode(e.expr, ignoreNA).toAggregateExpression(false))
   }
+
+  def pandasCovar(col1: Column, col2: Column, ddof: Int): Column = {
+    Column(PandasCovar(col1.expr, col2.expr, ddof).toAggregateExpression(false))
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org