You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/07/06 21:42:14 UTC

[beam] branch master updated: Fix documentation about hand implemented global aggregations (#22173)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a21bda843cf Fix documentation about hand implemented global aggregations (#22173)
a21bda843cf is described below

commit a21bda843cf7dc53fe438ec00a4fbfdd6e1d8262
Author: Brian Hulette <bh...@google.com>
AuthorDate: Wed Jul 6 14:42:07 2022 -0700

    Fix documentation about hand implemented global aggregations (#22173)
---
 sdks/python/apache_beam/dataframe/frames.py | 37 ++++++++++++++++++-----------
 1 file changed, 23 insertions(+), 14 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 8368fe82eda..4f47efdad06 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -103,7 +103,10 @@ def _fillna_alias(method):
           frame_base.populate_defaults(pd.DataFrame)(wrapper)))
 
 
+# These aggregations are commutative and associative, they can be trivially
+# "lifted" (i.e. we can pre-aggregate on partitions, group, then post-aggregate)
 LIFTABLE_AGGREGATIONS = ['all', 'any', 'max', 'min', 'prod', 'sum']
+# These aggregations can be lifted if post-aggregated with "sum"
 LIFTABLE_WITH_SUM_AGGREGATIONS = ['size', 'count']
 UNLIFTABLE_AGGREGATIONS = [
     'mean',
@@ -115,10 +118,6 @@ UNLIFTABLE_AGGREGATIONS = [
     'skew',
     'kurt',
     'kurtosis',
-    # TODO: The below all have specialized distributed
-    # implementations, but they require tracking
-    # multiple intermediate series, which is difficult
-    # to lift in groupby
     'std',
     'var',
     'corr',
@@ -129,12 +128,30 @@ ALL_AGGREGATIONS = (
     LIFTABLE_AGGREGATIONS + LIFTABLE_WITH_SUM_AGGREGATIONS +
     UNLIFTABLE_AGGREGATIONS)
 
+# These aggregations have specialized distributed implementations on
+# DeferredSeries, which are re-used in DeferredFrame. Note they are *not* used
+# for grouped aggregations, since they generally require tracking multiple
+# intermediate series, which is difficult to lift in groupby.
+HAND_IMPLEMENTED_GLOBAL_AGGREGATIONS = {
+    'quantile',
+    'std',
+    'var',
+    'nunique',
+    'corr',
+    'cov',
+    'skew',
+    'kurt',
+    'kurtosis'
+}
+UNLIFTABLE_GLOBAL_AGGREGATIONS = (
+    set(UNLIFTABLE_AGGREGATIONS) - set(HAND_IMPLEMENTED_GLOBAL_AGGREGATIONS))
+
 
 def _agg_method(base, func):
   def wrapper(self, *args, **kwargs):
     return self.agg(func, *args, **kwargs)
 
-  if func in UNLIFTABLE_AGGREGATIONS:
+  if func in UNLIFTABLE_GLOBAL_AGGREGATIONS:
     wrapper.__doc__ = (
         f"``{func}`` cannot currently be parallelized. It will "
         "require collecting all data on a single node.")
@@ -1987,15 +2004,7 @@ class DeferredSeries(DeferredDataFrameOrSeries):
             "single node.")
 
       # We have specialized distributed implementations for these
-      if base_func in ('quantile',
-                       'std',
-                       'var',
-                       'nunique',
-                       'corr',
-                       'cov',
-                       'skew',
-                       'kurt',
-                       'kurtosis'):
+      if base_func in HAND_IMPLEMENTED_GLOBAL_AGGREGATIONS:
         result = getattr(self, base_func)(*args, **kwargs)
         if isinstance(func, list):
           with expressions.allow_non_parallel_operations(True):