You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/11/01 19:40:00 UTC

[jira] [Work logged] (BEAM-12550) Implement parallelizable skew and kurtosis

     [ https://issues.apache.org/jira/browse/BEAM-12550?focusedWorklogId=672813&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-672813 ]

ASF GitHub Bot logged work on BEAM-12550:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 01/Nov/21 19:39
            Start Date: 01/Nov/21 19:39
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request #15809:
URL: https://github.com/apache/beam/pull/15809#discussion_r740461343



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
               [self._expr, other._expr],
               requires_partition_by=partitionings.Singleton(reason=reason)))
 
+  @frame_base.with_docs_from(pd.Series)
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def skew(self, axis, skipna, level, numeric_only, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna is None or skipna:
+      self = self.dropna()  # pylint: disable=self-cls-assignment
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+    def compute_moments(x):
+      n = len(x)
+      if n == 0:
+        m, s, third_moment = 0, 0, 0
+      elif n < 3:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      else:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())

Review comment:
       This uses the same logic as the `elif` block. I think the intention is to use the pandas `skew` here since we think it is more efficient, right?
   
   Could you also run some microbenchmarks to see if the approach with pandas `skew` actually is more performant?

##########
File path: sdks/python/apache_beam/dataframe/frames_test.py
##########
@@ -1397,7 +1398,6 @@ def test_series_agg(self, agg_method):
         'median',
         'sem',
         'mad',
-        'skew',

Review comment:
       There are a few other places where this is copied, can you make the change there as well? That should fix the precommit failures
   
   Alternatively, it could be nice to have the list of non parallelizable aggregations defined in one place in a constant, then just update it there.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
               [self._expr, other._expr],
               requires_partition_by=partitionings.Singleton(reason=reason)))
 
+  @frame_base.with_docs_from(pd.Series)
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def skew(self, axis, skipna, level, numeric_only, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna is None or skipna:
+      self = self.dropna()  # pylint: disable=self-cls-assignment
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+    def compute_moments(x):
+      n = len(x)
+      if n == 0:
+        m, s, third_moment = 0, 0, 0
+      elif n < 3:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      else:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      return pd.DataFrame(
+          dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+    def combine_moments(data):
+      m = s = n = third_moment = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+        else:
+          mean_b = s / n
+          mean_a = datum.s / datum.n
+          delta = mean_b - mean_a
+          n_a = datum.n
+          n_b = n
+          combined_n = n + datum.n
+          third_moment += datum.third_moment + (
+              (delta**3 * ((n_a * n_b) * (n_a - n_b)) / ((combined_n)**2)) +
+              ((3 * delta) * ((n_a * m) - (n_b * datum.m)) / (combined_n)))
+          m += datum.m + delta**2 * n * datum.n / (n + datum.n)
+          s += datum.s
+          n += datum.n

Review comment:
       nit: you're computing `n + datum.n` in several places. Ccould you just reference `combined_n` everywhere?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
               [self._expr, other._expr],
               requires_partition_by=partitionings.Singleton(reason=reason)))
 
+  @frame_base.with_docs_from(pd.Series)
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def skew(self, axis, skipna, level, numeric_only, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna is None or skipna:
+      self = self.dropna()  # pylint: disable=self-cls-assignment
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+    def compute_moments(x):
+      n = len(x)
+      if n == 0:
+        m, s, third_moment = 0, 0, 0
+      elif n < 3:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      else:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      return pd.DataFrame(
+          dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+    def combine_moments(data):
+      m = s = n = third_moment = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+        else:
+          mean_b = s / n
+          mean_a = datum.s / datum.n
+          delta = mean_b - mean_a
+          n_a = datum.n
+          n_b = n
+          combined_n = n + datum.n
+          third_moment += datum.third_moment + (
+              (delta**3 * ((n_a * n_b) * (n_a - n_b)) / ((combined_n)**2)) +
+              ((3 * delta) * ((n_a * m) - (n_b * datum.m)) / (combined_n)))
+          m += datum.m + delta**2 * n * datum.n / (n + datum.n)
+          s += datum.s
+          n += datum.n
+
+      if n < 3:
+        return float('nan')
+      elif m == 0:
+        return float(0)
+      else:
+        return combined_n * math.sqrt(combined_n - 1) / (combined_n -
+                                                         2) * third_moment / (
+                                                             m**(3 / 2))

Review comment:
       Could you add a comment here that this is scaling for _unbiased_ (aka sample) skewness, which is the reason it's slightly different from the equation at https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics, which is producing biased (aka population) skew.
   
   You might also add a link to https://en.wikipedia.org/wiki/Skewness#Sample_skewness which has definitions for each.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
               [self._expr, other._expr],
               requires_partition_by=partitionings.Singleton(reason=reason)))
 
+  @frame_base.with_docs_from(pd.Series)
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def skew(self, axis, skipna, level, numeric_only, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna is None or skipna:
+      self = self.dropna()  # pylint: disable=self-cls-assignment
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+    def compute_moments(x):
+      n = len(x)
+      if n == 0:
+        m, s, third_moment = 0, 0, 0
+      elif n < 3:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      else:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      return pd.DataFrame(
+          dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+    def combine_moments(data):
+      m = s = n = third_moment = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+        else:
+          mean_b = s / n
+          mean_a = datum.s / datum.n
+          delta = mean_b - mean_a
+          n_a = datum.n
+          n_b = n
+          combined_n = n + datum.n
+          third_moment += datum.third_moment + (
+              (delta**3 * ((n_a * n_b) * (n_a - n_b)) / ((combined_n)**2)) +
+              ((3 * delta) * ((n_a * m) - (n_b * datum.m)) / (combined_n)))
+          m += datum.m + delta**2 * n * datum.n / (n + datum.n)
+          s += datum.s
+          n += datum.n
+
+      if n < 3:
+        return float('nan')
+      elif m == 0:
+        return float(0)

Review comment:
       Is it always true that skew is 0 if the second moment is 0? I'm wondering what happens if m2 is 0 and m3 is non-zero. Perhaps that's not possible. I think m2 can only be 0 if every element is equal. Does that seem right @robertwb?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1430,6 +1430,72 @@ def corr(self, other, method, min_periods):
               [self._expr, other._expr],
               requires_partition_by=partitionings.Singleton(reason=reason)))
 
+  @frame_base.with_docs_from(pd.Series)
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def skew(self, axis, skipna, level, numeric_only, **kwargs):
+    if level is not None:
+      raise NotImplementedError("per-level aggregation")
+    if skipna is None or skipna:
+      self = self.dropna()  # pylint: disable=self-cls-assignment
+    # See the online, numerically stable formulae at
+    # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
+    def compute_moments(x):
+      n = len(x)
+      if n == 0:
+        m, s, third_moment = 0, 0, 0
+      elif n < 3:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      else:
+        m = x.std(ddof=0)**2 * n
+        s = x.sum()
+        third_moment = (((x - x.mean())**3).sum())
+      return pd.DataFrame(
+          dict(m=[m], s=[s], n=[n], third_moment=[third_moment]))
+
+    def combine_moments(data):
+      m = s = n = third_moment = 0.0
+      for datum in data.itertuples():
+        if datum.n == 0:
+          continue
+        elif n == 0:
+          m, s, n, third_moment = datum.m, datum.s, datum.n, datum.third_moment
+        else:
+          mean_b = s / n
+          mean_a = datum.s / datum.n
+          delta = mean_b - mean_a
+          n_a = datum.n
+          n_b = n

Review comment:
       I think defining `n_a` and `n_b` here just confuses things, for a couple of reasons:
   1. You're not using them everywhere (the `m` calculation still references the original names).
   2. You're only doing it for `n`. What about the other variables that are being accumulated (`s`, `m`, `third_moment`)?
   
   I think this code should either a) define variables like this for everything that's being accumulated, and reference them everyhwere, or b) just reference the original names. It's up to you which you'd prefer. I think (a) improved clarity substantially, but at the cost of some performance.
   
   Speaking of clarity, maybe `m` should be called `second_moment`, or `m2`, to distinguish it from the third moment?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 672813)
    Time Spent: 1.5h  (was: 1h 20m)

> Implement parallelizable skew and kurtosis 
> -------------------------------------------
>
>                 Key: BEAM-12550
>                 URL: https://issues.apache.org/jira/browse/BEAM-12550
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-dataframe
>            Reporter: Brian Hulette
>            Assignee: Svetak Vihaan Sundhar
>            Priority: P3
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> skew and kurtosis should be parallelizable/lifftable by using a similar [approach as std and var|https://github.com/apache/beam/blob/a0f5e932d8a9aa491b16361abdc629b5e9a483f6/sdks/python/apache_beam/dataframe/frames.py#L1307-L1310]. See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Higher-order_statistics
> which has information on extending that approach to calculating the third and fourth central moments, needed for skew and kurtosis.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)