You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/09/12 02:30:00 UTC

[GitHub] [spark] HyukjinKwon commented on a change in pull request #33814: [SPARK-36396][PYTHON] Implement DataFrame.cov

HyukjinKwon commented on a change in pull request #33814:
URL: https://github.com/apache/spark/pull/33814#discussion_r706731042



##########
File path: python/pyspark/pandas/frame.py
##########
@@ -8164,6 +8165,137 @@ def update(self, other: "DataFrame", join: str = "left", overwrite: bool = True)
         internal = self._internal.with_new_sdf(sdf, data_fields=data_fields)
         self._update_internal_frame(internal, requires_same_anchor=False)
 
+    def cov(self, min_periods: Optional[int] = None) -> "DataFrame":
+        """
+        Compute pairwise covariance of columns, excluding NA/null values.
+
+        Compute the pairwise covariance among the series of a DataFrame.
+        The returned data frame is the `covariance matrix
+        <https://en.wikipedia.org/wiki/Covariance_matrix>`__ of the columns
+        of the DataFrame.
+
+        Both NA and null values are automatically excluded from the
+        calculation. (See the note below about bias from missing values.)
+        A threshold can be set for the minimum number of
+        observations for each value created. Comparisons with observations
+        below this threshold will be returned as ``NaN``.
+
+        This method is generally used for the analysis of time series data to
+        understand the relationship between different measures
+        across time.
+
+        .. versionadded:: 3.3.0
+
+        Parameters
+        ----------
+        min_periods : int, optional
+            Minimum number of observations required per pair of columns
+            to have a valid result.
+
+        Returns
+        -------
+        DataFrame
+            The covariance matrix of the series of the DataFrame.
+
+        See Also
+        --------
+        Series.cov : Compute covariance with another Series.
+
+        Examples
+        --------
+        >>> df = ps.DataFrame([(1, 2), (0, 3), (2, 0), (1, 1)],
+        ...                   columns=['dogs', 'cats'])
+        >>> df.cov()
+                  dogs      cats
+        dogs  0.666667 -1.000000
+        cats -1.000000  1.666667
+
+        >>> np.random.seed(42)
+        >>> df = ps.DataFrame(np.random.randn(1000, 5),
+        ...                   columns=['a', 'b', 'c', 'd', 'e'])
+        >>> df.cov()
+                  a         b         c         d         e
+        a  0.998438 -0.020161  0.059277 -0.008943  0.014144
+        b -0.020161  1.059352 -0.008543 -0.024738  0.009826
+        c  0.059277 -0.008543  1.010670 -0.001486 -0.000271
+        d -0.008943 -0.024738 -0.001486  0.921297 -0.013692
+        e  0.014144  0.009826 -0.000271 -0.013692  0.977795
+
+        **Minimum number of periods**
+
+        This method also supports an optional ``min_periods`` keyword
+        that specifies the required minimum number of non-NA observations for
+        each column pair in order to have a valid result:
+
+        >>> np.random.seed(42)
+        >>> df = pd.DataFrame(np.random.randn(20, 3),
+        ...                   columns=['a', 'b', 'c'])
+        >>> df.loc[df.index[:5], 'a'] = np.nan
+        >>> df.loc[df.index[5:10], 'b'] = np.nan
+        >>> sdf = ps.from_pandas(df)
+        >>> sdf.cov(min_periods=12)
+                  a         b         c
+        a  0.316741       NaN -0.150812
+        b       NaN  1.248003  0.191417
+        c -0.150812  0.191417  0.895202
+        """
+        min_periods = 1 if min_periods is None else min_periods
+
+        psdf = self[
+            [
+                col
+                for col in self.columns
+                if isinstance(self[col].spark.data_type, BooleanType)
+                or (
+                    isinstance(self[col].spark.data_type, NumericType)
+                    and not isinstance(self[col].spark.data_type, DecimalType)
+                )
+            ]
+        ]
+
+        num_cols = len(psdf.columns)
+        data_cols = psdf._internal.data_spark_column_names
+
+        def _loop(func: Callable) -> None:
+            step = 0
+            for r in range(0, num_cols):
+                step += r
+                for c in range(r, num_cols):
+                    func(r, c, step)
+
+        count_not_null_scols = []
+        _loop(
+            lambda r, c, _: count_not_null_scols.append(
+                F.count(
+                    F.when(F.col(data_cols[r]).isNotNull() & F.col(data_cols[c]).isNotNull(), 1)
+                )
+            ),
+        )
+
+        count_not_null = (
+            psdf._internal.spark_frame.replace(float("nan"), None)
+            .select(*count_not_null_scols)
+            .head(1)[0]
+        )
+
+        cov_scols = []
+        _loop(
+            lambda r, c, step: cov_scols.append(
+                F.covar_samp(F.col(data_cols[r]).cast("double"), F.col(data_cols[c]).cast("double"))
+                if count_not_null[r * num_cols + c - step] >= min_periods
+                else F.lit(None)
+            ),
+        )
+        pair_cov = psdf._internal.spark_frame.select(*cov_scols).head(1)[0]

Review comment:
       Can we do this from here to below without using a Spark job?
   
   @dgd-contributor it would be great to show some intermediate and explain it with some comments.  It's a bit difficult to follow the logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org