You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/07/06 21:42:14 UTC
[beam] branch master updated: Fix documentation about hand implemented global aggregations (#22173)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a21bda843cf Fix documentation about hand implemented global aggregations (#22173)
a21bda843cf is described below
commit a21bda843cf7dc53fe438ec00a4fbfdd6e1d8262
Author: Brian Hulette <bh...@google.com>
AuthorDate: Wed Jul 6 14:42:07 2022 -0700
Fix documentation about hand implemented global aggregations (#22173)
---
sdks/python/apache_beam/dataframe/frames.py | 37 ++++++++++++++++++-----------
1 file changed, 23 insertions(+), 14 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 8368fe82eda..4f47efdad06 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -103,7 +103,10 @@ def _fillna_alias(method):
frame_base.populate_defaults(pd.DataFrame)(wrapper)))
+# These aggregations are commutative and associative, they can be trivially
+# "lifted" (i.e. we can pre-aggregate on partitions, group, then post-aggregate)
LIFTABLE_AGGREGATIONS = ['all', 'any', 'max', 'min', 'prod', 'sum']
+# These aggregations can be lifted if post-aggregated with "sum"
LIFTABLE_WITH_SUM_AGGREGATIONS = ['size', 'count']
UNLIFTABLE_AGGREGATIONS = [
'mean',
@@ -115,10 +118,6 @@ UNLIFTABLE_AGGREGATIONS = [
'skew',
'kurt',
'kurtosis',
- # TODO: The below all have specialized distributed
- # implementations, but they require tracking
- # multiple intermediate series, which is difficult
- # to lift in groupby
'std',
'var',
'corr',
@@ -129,12 +128,30 @@ ALL_AGGREGATIONS = (
LIFTABLE_AGGREGATIONS + LIFTABLE_WITH_SUM_AGGREGATIONS +
UNLIFTABLE_AGGREGATIONS)
+# These aggregations have specialized distributed implementations on
+# DeferredSeries, which are re-used in DeferredFrame. Note they are *not* used
+# for grouped aggregations, since they generally require tracking multiple
+# intermediate series, which is difficult to lift in groupby.
+HAND_IMPLEMENTED_GLOBAL_AGGREGATIONS = {
+ 'quantile',
+ 'std',
+ 'var',
+ 'nunique',
+ 'corr',
+ 'cov',
+ 'skew',
+ 'kurt',
+ 'kurtosis'
+}
+UNLIFTABLE_GLOBAL_AGGREGATIONS = (
+ set(UNLIFTABLE_AGGREGATIONS) - set(HAND_IMPLEMENTED_GLOBAL_AGGREGATIONS))
+
def _agg_method(base, func):
def wrapper(self, *args, **kwargs):
return self.agg(func, *args, **kwargs)
- if func in UNLIFTABLE_AGGREGATIONS:
+ if func in UNLIFTABLE_GLOBAL_AGGREGATIONS:
wrapper.__doc__ = (
f"``{func}`` cannot currently be parallelized. It will "
"require collecting all data on a single node.")
@@ -1987,15 +2004,7 @@ class DeferredSeries(DeferredDataFrameOrSeries):
"single node.")
# We have specialized distributed implementations for these
- if base_func in ('quantile',
- 'std',
- 'var',
- 'nunique',
- 'corr',
- 'cov',
- 'skew',
- 'kurt',
- 'kurtosis'):
+ if base_func in HAND_IMPLEMENTED_GLOBAL_AGGREGATIONS:
result = getattr(self, base_func)(*args, **kwargs)
if isinstance(func, list):
with expressions.allow_non_parallel_operations(True):