You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/09 16:20:55 UTC

[GitHub] [beam] robertwb commented on a change in pull request #12982: [BEAM-9547] Dataframe covariance and correlation.

robertwb commented on a change in pull request #12982:
URL: https://github.com/apache/beam/pull/12982#discussion_r502197895



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +36,124 @@ def __array__(self, dtype=None):
 
   between = frame_base._elementwise_method('between')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def std(self, axis, skipna, level, ddof, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna:
+      self = self.dropna()
+
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
+    def compute_moments(x):
+      n = len(x)
+      m = x.std(ddof=0)**2 * n
+      s = x.sum()
+      return pd.DataFrame(dict(m=[m], s=[s], n=[n]))
+
+    def combine_moments(data):
+      m = s = n = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n = datum.m, datum.s, datum.n
+        else:
+          m += datum.m + (s / n - datum.s / datum.n)**2 * n * datum.n / (

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -150,8 +115,72 @@ def combine_co_moments(data, std_x, std_y):
           expressions.ComputedExpression(
               'corr',
               lambda df,
-              other: df.corr(other, method=method, DataFrame=min_periods)[
-                  self._expr, other._expr],
+              other: df.corr(other, method=method, min_periods=min_periods),
+              [self._expr, other._expr],
+              requires_partition_by=partitionings.Singleton()))
+
+  def _corr_aligned(self, other, method, min_periods):
+    std_x = self.std()
+    std_y = other.std()
+    cov = self._cov_aligned(other, min_periods)
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'normalize',
+              lambda cov,
+              std_x,
+              std_y: cov / (std_x * std_y),
+              [cov._expr, std_x._expr, std_y._expr],
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def cov(self, other, min_periods, ddof):
+    x, y = self.dropna().align(other.dropna(), 'inner')
+    return x._cov_aligned(y, min_periods, ddof)
+
+  def _cov_aligned(self, other, min_periods, ddof=1):
+    # Use the formulae from
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Covariance
+    def compute_co_moments(x, y):
+      n = len(x)
+      if n <= 1:
+        c = 0
+      else:
+        c = x.corr(y) * x.std() * y.std() * (n - 1)
+      sx = x.sum()
+      sy = y.sum()
+      return pd.DataFrame(dict(c=[c], sx=[sx], sy=[sy], n=[n]))
+
+    def combine_co_moments(data):
+      c = sx = sy = n = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          c, sx, sy, n = datum.c, datum.sx, datum.sy, datum.n
+        else:
+          c += (
+              datum.c + (sx / n - datum.sx / datum.n) *
+              (sy / n - datum.sy / datum.n) * n * datum.n / (n + datum.n))
+          sx += datum.sx
+          sy += datum.sy
+          n += datum.n
+      if n < max(2, min_periods or 0):
+        return float('nan')

Review comment:
       Yes. Done.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -34,6 +36,124 @@ def __array__(self, dtype=None):
 
   between = frame_base._elementwise_method('between')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def std(self, axis, skipna, level, ddof, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna:
+      self = self.dropna()
+
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance

Review comment:
       Done. Even if the subheading change, they're descriptive enough to identify what is meant.

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -36,7 +36,7 @@ def _run_test(self, func, *args):
             expressions.ConstantExpression(arg, arg[0:0])) for arg in args
     ]
     expected = func(*args)
-    actual = expressions.PartitioningSession({}).evaluate(
+    actual = expressions.Session({}).evaluate(

Review comment:
       Some of the other tests in this file were relying on the ordering. 

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -150,8 +115,72 @@ def combine_co_moments(data, std_x, std_y):
           expressions.ComputedExpression(
               'corr',
               lambda df,
-              other: df.corr(other, method=method, DataFrame=min_periods)[
-                  self._expr, other._expr],
+              other: df.corr(other, method=method, min_periods=min_periods),
+              [self._expr, other._expr],
+              requires_partition_by=partitionings.Singleton()))
+
+  def _corr_aligned(self, other, method, min_periods):
+    std_x = self.std()
+    std_y = other.std()
+    cov = self._cov_aligned(other, min_periods)
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'normalize',
+              lambda cov,
+              std_x,
+              std_y: cov / (std_x * std_y),
+              [cov._expr, std_x._expr, std_y._expr],
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def cov(self, other, min_periods, ddof):
+    x, y = self.dropna().align(other.dropna(), 'inner')
+    return x._cov_aligned(y, min_periods, ddof)
+
+  def _cov_aligned(self, other, min_periods, ddof=1):
+    # Use the formulae from
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Covariance
+    def compute_co_moments(x, y):
+      n = len(x)
+      if n <= 1:
+        c = 0
+      else:
+        c = x.corr(y) * x.std() * y.std() * (n - 1)

Review comment:
       Good point. Done.

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -36,7 +36,7 @@ def _run_test(self, func, *args):
             expressions.ConstantExpression(arg, arg[0:0])) for arg in args
     ]
     expected = func(*args)
-    actual = expressions.PartitioningSession({}).evaluate(
+    actual = expressions.Session({}).evaluate(

Review comment:
       I've made this an option, we can try to switch it in the future.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org