You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/08/13 19:16:55 UTC
[beam] branch master updated: [BEAM-9547] Lift associative
aggregations. (#12469)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 198a583 [BEAM-9547] Lift associative aggregations. (#12469)
198a583 is described below
commit 198a583c140a34211fbeb9cb43d8692cf8bf1cd5
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Thu Aug 13 12:16:37 2020 -0700
[BEAM-9547] Lift associative aggregations. (#12469)
---
sdks/python/apache_beam/dataframe/expressions.py | 11 +-
sdks/python/apache_beam/dataframe/frame_base.py | 5 -
sdks/python/apache_beam/dataframe/frames.py | 136 ++++++++++++++++++-----
sdks/python/apache_beam/dataframe/frames_test.py | 23 +++-
sdks/python/apache_beam/dataframe/transforms.py | 22 ++++
5 files changed, 160 insertions(+), 37 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py
index 7e8b782..3b85eae 100644
--- a/sdks/python/apache_beam/dataframe/expressions.py
+++ b/sdks/python/apache_beam/dataframe/expressions.py
@@ -64,7 +64,7 @@ class Expression(object):
self._name = name
self._proxy = proxy
# Store for preservation through pickling.
- self._id = _id or '%s_%s' % (name, id(self))
+ self._id = _id or '%s_%s_%s' % (name, type(proxy).__name__, id(self))
def proxy(self): # type: () -> T
return self._proxy
@@ -255,9 +255,12 @@ def _get_allow_non_parallel():
@contextlib.contextmanager
def allow_non_parallel_operations(allow=True):
- old_value, _ALLOW_NON_PARALLEL.value = _ALLOW_NON_PARALLEL.value, allow
- yield
- _ALLOW_NON_PARALLEL.value = old_value
+ if allow is None:
+ yield
+ else:
+ old_value, _ALLOW_NON_PARALLEL.value = _ALLOW_NON_PARALLEL.value, allow
+ yield
+ _ALLOW_NON_PARALLEL.value = old_value
class NonParallelOperation(Exception):
diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py
index 0b1c296..2b2b199 100644
--- a/sdks/python/apache_beam/dataframe/frame_base.py
+++ b/sdks/python/apache_beam/dataframe/frame_base.py
@@ -218,11 +218,6 @@ def _agg_method(func):
return wrapper
-def _associative_agg_method(func):
- # TODO(robertwb): Multi-level agg.
- return _agg_method(func)
-
-
def wont_implement_method(msg):
def wrapper(self, *args, **kwargs):
raise WontImplementError(msg)
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 9e2e97a..aacf4ee 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -35,20 +35,41 @@ class DeferredSeries(frame_base.DeferredFrame):
transform = frame_base._elementwise_method(
'transform', restrictions={'axis': 0})
- def agg(self, *args, **kwargs):
- return frame_base.DeferredFrame.wrap(
- expressions.ComputedExpression(
- 'agg',
- lambda df: df.agg(*args, **kwargs), [self._expr],
- preserves_partition_by=partitionings.Singleton(),
- requires_partition_by=partitionings.Singleton()))
+ def aggregate(self, func, axis=0, *args, **kwargs):
+ if isinstance(func, list) and len(func) > 1:
+ # Aggregate each column separately, then stick them all together.
+ rows = [self.agg([f], *args, **kwargs) for f in func]
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'join_aggregate',
+ lambda *rows: pd.concat(rows), [row._expr for row in rows]))
+ else:
+ # We're only handling a single column.
+ base_func = func[0] if isinstance(func, list) else func
+ if _is_associative(base_func) and not args and not kwargs:
+ intermediate = expressions.elementwise_expression(
+ 'pre_aggregate',
+ lambda s: s.agg([base_func], *args, **kwargs), [self._expr])
+ allow_nonparallel_final = True
+ else:
+ intermediate = self._expr
+ allow_nonparallel_final = None # i.e. don't change the value
+ with expressions.allow_non_parallel_operations(allow_nonparallel_final):
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'aggregate',
+ lambda s: s.agg(func, *args, **kwargs), [intermediate],
+ preserves_partition_by=partitionings.Singleton(),
+ requires_partition_by=partitionings.Singleton()))
- all = frame_base._associative_agg_method('all')
- any = frame_base._associative_agg_method('any')
- min = frame_base._associative_agg_method('min')
- max = frame_base._associative_agg_method('max')
- prod = product = frame_base._associative_agg_method('prod')
- sum = frame_base._associative_agg_method('sum')
+ agg = aggregate
+
+ all = frame_base._agg_method('all')
+ any = frame_base._agg_method('any')
+ min = frame_base._agg_method('min')
+ max = frame_base._agg_method('max')
+ prod = product = frame_base._agg_method('prod')
+ sum = frame_base._agg_method('sum')
cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
'order-sensitive')
@@ -150,18 +171,62 @@ class DeferredDataFrame(frame_base.DeferredFrame):
def loc(self):
return _DeferredLoc(self)
- @frame_base.args_to_kwargs(pd.DataFrame)
- @frame_base.populate_defaults(pd.DataFrame)
- def aggregate(self, axis, **kwargs):
+ def aggregate(self, func, axis=0, *args, **kwargs):
if axis is None:
- return self.agg(axis=1, **kwargs).agg(axis=0, **kwargs)
- return frame_base.DeferredFrame.wrap(
+ # Aggregate across all elements by first aggregating across columns,
+ # then across rows.
+ return self.agg(func, *args, **dict(kwargs, axis=1)).agg(
+ func, *args, **dict(kwargs, axis=0))
+ elif axis in (1, 'columns'):
+ # This is an easy elementwise aggregation.
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'aggregate',
+ lambda df: df.agg(func, axis=1, *args, **kwargs),
+ [self._expr],
+ requires_partition_by=partitionings.Nothing()))
+ elif len(self._expr.proxy().columns) == 0 or args or kwargs:
+ # For these corner cases, just colocate everything.
+ return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
'aggregate',
- lambda df: df.agg(axis=axis, **kwargs),
+ lambda df: df.agg(func, *args, **kwargs),
[self._expr],
- # TODO(robertwb): Sub-aggregate when possible.
requires_partition_by=partitionings.Singleton()))
+ else:
+ # In the general case, compute the aggregation of each column separately,
+ # then recombine.
+ if not isinstance(func, dict):
+ col_names = list(self._expr.proxy().columns)
+ func = {col: func for col in col_names}
+ else:
+ col_names = list(func.keys())
+ aggregated_cols = []
+ for col in col_names:
+ funcs = func[col]
+ if not isinstance(funcs, list):
+ funcs = [funcs]
+ aggregated_cols.append(self[col].agg(funcs, *args, **kwargs))
+ # The final shape is different depending on whether any of the columns
+ # were aggregated by a list of aggregators.
+ with expressions.allow_non_parallel_operations():
+ if any(isinstance(funcs, list) for funcs in func.values()):
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'join_aggregate',
+ lambda *cols: pd.DataFrame(
+ {col: value for col, value in zip(col_names, cols)}),
+ [col._expr for col in aggregated_cols],
+ requires_partition_by=partitionings.Singleton()))
+ else:
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'join_aggregate',
+ lambda *cols: pd.Series(
+ {col: value[0] for col, value in zip(col_names, cols)}),
+ [col._expr for col in aggregated_cols],
+ requires_partition_by=partitionings.Singleton(),
+ proxy=self._expr.proxy().agg(func, *args, **kwargs)))
agg = aggregate
@@ -169,16 +234,27 @@ class DeferredDataFrame(frame_base.DeferredFrame):
memory_usage = frame_base.wont_implement_method('non-deferred value')
- all = frame_base._associative_agg_method('all')
- any = frame_base._associative_agg_method('any')
+ all = frame_base._agg_method('all')
+ any = frame_base._agg_method('any')
cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
'order-sensitive')
diff = frame_base.wont_implement_method('order-sensitive')
- max = frame_base._associative_agg_method('max')
- min = frame_base._associative_agg_method('min')
- mode = frame_base._agg_method('mode')
+ max = frame_base._agg_method('max')
+ min = frame_base._agg_method('min')
+
+ def mode(self, axis=0, *args, **kwargs):
+ if axis == 1 or axis == 'columns':
+ raise frame_base.WontImplementError('non-deferred column values')
+ return frame_base.DeferredFrame.wrap(
+ expressions.ComputedExpression(
+ 'mode',
+ lambda df: df.mode(*args, **kwargs),
+ [self._expr],
+ #TODO(robertwb): Approximate?
+ requires_partition_by=partitionings.Singleton(),
+ preserves_partition_by=partitionings.Singleton()))
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@@ -203,7 +279,7 @@ class DeferredDataFrame(frame_base.DeferredFrame):
isna = frame_base._elementwise_method('isna')
notnull = notna = frame_base._elementwise_method('notna')
- prod = product = frame_base._associative_agg_method('prod')
+ prod = product = frame_base._agg_method('prod')
@frame_base.args_to_kwargs(pd.DataFrame)
@frame_base.populate_defaults(pd.DataFrame)
@@ -296,7 +372,7 @@ class DeferredDataFrame(frame_base.DeferredFrame):
stack = frame_base._elementwise_method('stack')
- sum = frame_base._associative_agg_method('sum')
+ sum = frame_base._agg_method('sum')
to_records = to_dict = to_numpy = to_string = (
frame_base.wont_implement_method('non-deferred value'))
@@ -390,6 +466,12 @@ for meth in UNLIFTABLE_AGGREGATIONS:
setattr(DeferredGroupBy, meth, _unliftable_agg(meth))
+def _is_associative(func):
+ return func in LIFTABLE_AGGREGATIONS or (
+ getattr(func, '__name__', None) in LIFTABLE_AGGREGATIONS
+ and func.__module__ in ('numpy', 'builtins'))
+
+
class _DeferredLoc(object):
def __init__(self, frame):
self._frame = frame
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 773b3ba..7d975eb 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -16,6 +16,7 @@
from __future__ import absolute_import
+import sys
import unittest
import numpy as np
@@ -36,7 +37,7 @@ class DeferredFrameTest(unittest.TestCase):
expected = func(*args)
actual = expressions.Session({}).evaluate(func(*deferred_args)._expr)
self.assertTrue(
- expected.equals(actual),
+ getattr(expected, 'equals', expected.__eq__)(actual),
'Expected:\n\n%r\n\nActual:\n\n%r' % (expected, actual))
def test_series_arithmetic(self):
@@ -81,6 +82,26 @@ class DeferredFrameTest(unittest.TestCase):
self._run_test(lambda df: df.loc[df.A > 10], df)
self._run_test(lambda df: df.loc[lambda df: df.A > 10], df)
+ def test_series_agg(self):
+ s = pd.Series(list(range(16)))
+ self._run_test(lambda s: s.agg('sum'), s)
+ self._run_test(lambda s: s.agg(['sum']), s)
+ with beam.dataframe.allow_non_parallel_operations():
+ self._run_test(lambda s: s.agg(['sum', 'mean']), s)
+ self._run_test(lambda s: s.agg(['mean']), s)
+ self._run_test(lambda s: s.agg('mean'), s)
+
+ @unittest.skipIf(sys.version_info < (3, 6), 'Nondeterministic dict ordering.')
+ def test_dataframe_agg(self):
+ df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
+ self._run_test(lambda df: df.agg('sum'), df)
+ with beam.dataframe.allow_non_parallel_operations():
+ self._run_test(lambda df: df.agg(['sum', 'mean']), df)
+ self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df)
+ self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'mean'}), df)
+ self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df)
+ self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df)
+
class AllowNonParallelTest(unittest.TestCase):
def _use_non_parallel_operation(self):
diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py
index a270458..ed497f6 100644
--- a/sdks/python/apache_beam/dataframe/transforms.py
+++ b/sdks/python/apache_beam/dataframe/transforms.py
@@ -125,6 +125,7 @@ class _DataframeExpressionsTransform(transforms.PTransform):
return '%s:%s' % (self.stage.ops, id(self))
def expand(self, pcolls):
+
scalar_inputs = [expr for expr in self.stage.inputs if is_scalar(expr)]
tabular_inputs = [
expr for expr in self.stage.inputs if not is_scalar(expr)
@@ -180,6 +181,22 @@ class _DataframeExpressionsTransform(transforms.PTransform):
self.ops = []
self.outputs = set()
+ def __repr__(self, indent=0):
+ if indent:
+ sep = '\n' + ' ' * indent
+ else:
+ sep = ''
+ return (
+ "Stage[%sinputs=%s, %spartitioning=%s, %sops=%s, %soutputs=%s]" % (
+ sep,
+ self.inputs,
+ sep,
+ self.partitioning,
+ sep,
+ self.ops,
+ sep,
+ self.outputs))
+
# First define some helper functions.
def output_is_partitioned_by(expr, stage, partitioning):
if partitioning == partitionings.Nothing():
@@ -244,6 +261,11 @@ class _DataframeExpressionsTransform(transforms.PTransform):
# It also must be declared as an output of the producing stage.
expr_to_stage(arg).outputs.add(arg)
stage.ops.append(expr)
+ # Ensure that any inputs for the overall transform are added
+ # in downstream stages.
+ for arg in expr.args():
+ if arg in inputs:
+ stage.inputs.add(arg)
# This is a list as given expression may be available in many stages.
return [stage]