You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ru...@apache.org on 2022/09/13 03:44:18 UTC
[spark] branch master updated: [SPARK-40386][PS][SQL] Implement `ddof` in `DataFrame.cov`
This is an automated email from the ASF dual-hosted git repository.
ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f92b4941c63 [SPARK-40386][PS][SQL] Implement `ddof` in `DataFrame.cov`
f92b4941c63 is described below
commit f92b4941c631526a387c6f23554db53fbf922b96
Author: Ruifeng Zheng <ru...@apache.org>
AuthorDate: Tue Sep 13 11:43:47 2022 +0800
[SPARK-40386][PS][SQL] Implement `ddof` in `DataFrame.cov`
### What changes were proposed in this pull request?
1, add a dedicated expression for `DataFrame.cov`;
2, add missing parameter `ddof` in `DataFrame.cov`
### Why are the changes needed?
for api coverage
### Does this PR introduce _any_ user-facing change?
yes, API change
```
>>> np.random.seed(42)
>>> df = ps.DataFrame(np.random.randn(1000, 5),
... columns=['a', 'b', 'c', 'd', 'e'])
>>> df.cov()
a b c d e
a 0.998438 -0.020161 0.059277 -0.008943 0.014144
b -0.020161 1.059352 -0.008543 -0.024738 0.009826
c 0.059277 -0.008543 1.010670 -0.001486 -0.000271
d -0.008943 -0.024738 -0.001486 0.921297 -0.013692
e 0.014144 0.009826 -0.000271 -0.013692 0.977795
>>> df.cov(ddof=2)
a b c d e
a 0.999439 -0.020181 0.059336 -0.008952 0.014159
b -0.020181 1.060413 -0.008551 -0.024762 0.009836
c 0.059336 -0.008551 1.011683 -0.001487 -0.000271
d -0.008952 -0.024762 -0.001487 0.922220 -0.013705
e 0.014159 0.009836 -0.000271 -0.013705 0.978775
>>> df.cov(ddof=-1)
a b c d e
a 0.996444 -0.020121 0.059158 -0.008926 0.014116
b -0.020121 1.057235 -0.008526 -0.024688 0.009807
c 0.059158 -0.008526 1.008650 -0.001483 -0.000270
d -0.008926 -0.024688 -0.001483 0.919456 -0.013664
e 0.014116 0.009807 -0.000270 -0.013664 0.975842
```
### How was this patch tested?
added tests
Closes #37829 from zhengruifeng/ps_cov_ddof.
Authored-by: Ruifeng Zheng <ru...@apache.org>
Signed-off-by: Ruifeng Zheng <ru...@apache.org>
---
python/pyspark/pandas/frame.py | 31 +++++++++++++++++-----
python/pyspark/pandas/spark/functions.py | 5 ++++
python/pyspark/pandas/tests/test_dataframe.py | 10 +++++++
.../expressions/aggregate/Covariance.scala | 22 +++++++++++++++
.../spark/sql/api/python/PythonSQLUtils.scala | 4 +++
5 files changed, 66 insertions(+), 6 deletions(-)
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index 2a7fda2d527..3438d07896e 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -8738,8 +8738,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
internal = self._internal.with_new_sdf(sdf, data_fields=data_fields)
self._update_internal_frame(internal, check_same_anchor=False)
- # TODO: ddof should be implemented.
- def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
+ def cov(self, min_periods: Optional[int] = None, ddof: int = 1) -> "DataFrame":
"""
Compute pairwise covariance of columns, excluding NA/null values.
@@ -8755,8 +8754,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
below this threshold will be returned as ``NaN``.
This method is generally used for the analysis of time series data to
- understand the relationship between different measures
- across time.
+ understand the relationship between different measures across time.
.. versionadded:: 3.3.0
@@ -8765,6 +8763,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
min_periods : int, optional
Minimum number of observations required per pair of columns
to have a valid result.
+ ddof : int, default 1
+ Delta degrees of freedom. The divisor used in calculations
+ is ``N - ddof``, where ``N`` represents the number of elements.
+
+ .. versionadded:: 3.4.0
Returns
-------
@@ -8794,6 +8797,20 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
c 0.059277 -0.008543 1.010670 -0.001486 -0.000271
d -0.008943 -0.024738 -0.001486 0.921297 -0.013692
e 0.014144 0.009826 -0.000271 -0.013692 0.977795
+ >>> df.cov(ddof=2)
+ a b c d e
+ a 0.999439 -0.020181 0.059336 -0.008952 0.014159
+ b -0.020181 1.060413 -0.008551 -0.024762 0.009836
+ c 0.059336 -0.008551 1.011683 -0.001487 -0.000271
+ d -0.008952 -0.024762 -0.001487 0.922220 -0.013705
+ e 0.014159 0.009836 -0.000271 -0.013705 0.978775
+ >>> df.cov(ddof=-1)
+ a b c d e
+ a 0.996444 -0.020121 0.059158 -0.008926 0.014116
+ b -0.020121 1.057235 -0.008526 -0.024688 0.009807
+ c 0.059158 -0.008526 1.008650 -0.001483 -0.000270
+ d -0.008926 -0.024688 -0.001483 0.919456 -0.013664
+ e 0.014116 0.009807 -0.000270 -0.013664 0.975842
**Minimum number of periods**
@@ -8813,6 +8830,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
b NaN 1.248003 0.191417
c -0.150812 0.191417 0.895202
"""
+ if not isinstance(ddof, int):
+ raise TypeError("ddof must be integer")
min_periods = 1 if min_periods is None else min_periods
# Only compute covariance for Boolean and Numeric except Decimal
@@ -8891,8 +8910,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
step += r
for c in range(r, num_cols):
cov_scols.append(
- F.covar_samp(
- F.col(data_cols[r]).cast("double"), F.col(data_cols[c]).cast("double")
+ SF.covar(
+ F.col(data_cols[r]).cast("double"), F.col(data_cols[c]).cast("double"), ddof
)
if count_not_null[r * num_cols + c - step] >= min_periods
else F.lit(None)
diff --git a/python/pyspark/pandas/spark/functions.py b/python/pyspark/pandas/spark/functions.py
index 58715b5f781..79d73dcd7ea 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -51,6 +51,11 @@ def mode(col: Column, dropna: bool) -> Column:
return Column(sc._jvm.PythonSQLUtils.pandasMode(col._jc, dropna))
+def covar(col1: Column, col2: Column, ddof: int) -> Column:
+ sc = SparkContext._active_spark_context
+ return Column(sc._jvm.PythonSQLUtils.pandasCovar(col1._jc, col2._jc, ddof))
+
+
def repeat(col: Column, n: Union[int, Column]) -> Column:
"""
Repeats a string column n times, and returns it as a new string column.
diff --git a/python/pyspark/pandas/tests/test_dataframe.py b/python/pyspark/pandas/tests/test_dataframe.py
index 464cf09b45f..f491da72fd5 100644
--- a/python/pyspark/pandas/tests/test_dataframe.py
+++ b/python/pyspark/pandas/tests/test_dataframe.py
@@ -6958,6 +6958,16 @@ class DataFrameTest(ComparisonTestBase, SQLTestUtils):
self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), almost=True)
self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5))
+ # ddof
+ with self.assertRaisesRegex(TypeError, "ddof must be integer"):
+ psdf.cov(ddof="ddof")
+ for ddof in [-1, 0, 2]:
+ self.assert_eq(pdf.cov(ddof=ddof), psdf.cov(ddof=ddof), almost=True)
+ self.assert_eq(
+ pdf.cov(min_periods=4, ddof=ddof), psdf.cov(min_periods=4, ddof=ddof), almost=True
+ )
+ self.assert_eq(pdf.cov(min_periods=5, ddof=ddof), psdf.cov(min_periods=5, ddof=ddof))
+
# bool
pdf = pd.DataFrame(
{
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala
index 7a856a05b6f..ff31fb1128b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala
@@ -143,3 +143,25 @@ case class CovSample(
override protected def withNewChildrenInternal(
newLeft: Expression, newRight: Expression): CovSample = copy(left = newLeft, right = newRight)
}
+
+/**
+ * Covariance in Pandas' fashion. This expression is dedicated only for Pandas API on Spark.
+ * Refer to numpy.cov.
+ */
+case class PandasCovar(
+ override val left: Expression,
+ override val right: Expression,
+ ddof: Int)
+ extends Covariance(left, right, true) {
+
+ override val evaluateExpression: Expression = {
+ If(n === 0.0, Literal.create(null, DoubleType),
+ If(n === ddof, divideByZeroEvalResult, ck / (n - ddof)))
+ }
+ override def prettyName: String = "pandas_covar"
+
+ override protected def withNewChildrenInternal(
+ newLeft: Expression,
+ newRight: Expression): PandasCovar =
+ copy(left = newLeft, right = newRight)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index 538356cd8c8..2b74bcc3850 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -138,6 +138,10 @@ private[sql] object PythonSQLUtils extends Logging {
def pandasMode(e: Column, ignoreNA: Boolean): Column = {
Column(PandasMode(e.expr, ignoreNA).toAggregateExpression(false))
}
+
+ def pandasCovar(col1: Column, col2: Column, ddof: Int): Column = {
+ Column(PandasCovar(col1.expr, col2.expr, ddof).toAggregateExpression(false))
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org