You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2020/08/13 19:16:55 UTC

[beam] branch master updated: [BEAM-9547] Lift associative aggregations. (#12469)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 198a583  [BEAM-9547] Lift associative aggregations. (#12469)
198a583 is described below

commit 198a583c140a34211fbeb9cb43d8692cf8bf1cd5
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Thu Aug 13 12:16:37 2020 -0700

    [BEAM-9547] Lift associative aggregations. (#12469)
---
 sdks/python/apache_beam/dataframe/expressions.py |  11 +-
 sdks/python/apache_beam/dataframe/frame_base.py  |   5 -
 sdks/python/apache_beam/dataframe/frames.py      | 136 ++++++++++++++++++-----
 sdks/python/apache_beam/dataframe/frames_test.py |  23 +++-
 sdks/python/apache_beam/dataframe/transforms.py  |  22 ++++
 5 files changed, 160 insertions(+), 37 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py
index 7e8b782..3b85eae 100644
--- a/sdks/python/apache_beam/dataframe/expressions.py
+++ b/sdks/python/apache_beam/dataframe/expressions.py
@@ -64,7 +64,7 @@ class Expression(object):
     self._name = name
     self._proxy = proxy
     # Store for preservation through pickling.
-    self._id = _id or '%s_%s' % (name, id(self))
+    self._id = _id or '%s_%s_%s' % (name, type(proxy).__name__, id(self))
 
   def proxy(self):  # type: () -> T
     return self._proxy
@@ -255,9 +255,12 @@ def _get_allow_non_parallel():
 
 @contextlib.contextmanager
 def allow_non_parallel_operations(allow=True):
-  old_value, _ALLOW_NON_PARALLEL.value = _ALLOW_NON_PARALLEL.value, allow
-  yield
-  _ALLOW_NON_PARALLEL.value = old_value
+  if allow is None:
+    yield
+  else:
+    old_value, _ALLOW_NON_PARALLEL.value = _ALLOW_NON_PARALLEL.value, allow
+    yield
+    _ALLOW_NON_PARALLEL.value = old_value
 
 
 class NonParallelOperation(Exception):
diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py
index 0b1c296..2b2b199 100644
--- a/sdks/python/apache_beam/dataframe/frame_base.py
+++ b/sdks/python/apache_beam/dataframe/frame_base.py
@@ -218,11 +218,6 @@ def _agg_method(func):
   return wrapper
 
 
-def _associative_agg_method(func):
-  # TODO(robertwb): Multi-level agg.
-  return _agg_method(func)
-
-
 def wont_implement_method(msg):
   def wrapper(self, *args, **kwargs):
     raise WontImplementError(msg)
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 9e2e97a..aacf4ee 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -35,20 +35,41 @@ class DeferredSeries(frame_base.DeferredFrame):
   transform = frame_base._elementwise_method(
       'transform', restrictions={'axis': 0})
 
-  def agg(self, *args, **kwargs):
-    return frame_base.DeferredFrame.wrap(
-        expressions.ComputedExpression(
-            'agg',
-            lambda df: df.agg(*args, **kwargs), [self._expr],
-            preserves_partition_by=partitionings.Singleton(),
-            requires_partition_by=partitionings.Singleton()))
+  def aggregate(self, func, axis=0, *args, **kwargs):
+    if isinstance(func, list) and len(func) > 1:
+      # Aggregate each column separately, then stick them all together.
+      rows = [self.agg([f], *args, **kwargs) for f in func]
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'join_aggregate',
+              lambda *rows: pd.concat(rows), [row._expr for row in rows]))
+    else:
+      # We're only handling a single column.
+      base_func = func[0] if isinstance(func, list) else func
+      if _is_associative(base_func) and not args and not kwargs:
+        intermediate = expressions.elementwise_expression(
+            'pre_aggregate',
+            lambda s: s.agg([base_func], *args, **kwargs), [self._expr])
+        allow_nonparallel_final = True
+      else:
+        intermediate = self._expr
+        allow_nonparallel_final = None  # i.e. don't change the value
+      with expressions.allow_non_parallel_operations(allow_nonparallel_final):
+        return frame_base.DeferredFrame.wrap(
+            expressions.ComputedExpression(
+                'aggregate',
+                lambda s: s.agg(func, *args, **kwargs), [intermediate],
+                preserves_partition_by=partitionings.Singleton(),
+                requires_partition_by=partitionings.Singleton()))
 
-  all = frame_base._associative_agg_method('all')
-  any = frame_base._associative_agg_method('any')
-  min = frame_base._associative_agg_method('min')
-  max = frame_base._associative_agg_method('max')
-  prod = product = frame_base._associative_agg_method('prod')
-  sum = frame_base._associative_agg_method('sum')
+  agg = aggregate
+
+  all = frame_base._agg_method('all')
+  any = frame_base._agg_method('any')
+  min = frame_base._agg_method('min')
+  max = frame_base._agg_method('max')
+  prod = product = frame_base._agg_method('prod')
+  sum = frame_base._agg_method('sum')
 
   cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
       'order-sensitive')
@@ -150,18 +171,62 @@ class DeferredDataFrame(frame_base.DeferredFrame):
   def loc(self):
     return _DeferredLoc(self)
 
-  @frame_base.args_to_kwargs(pd.DataFrame)
-  @frame_base.populate_defaults(pd.DataFrame)
-  def aggregate(self, axis, **kwargs):
+  def aggregate(self, func, axis=0, *args, **kwargs):
     if axis is None:
-      return self.agg(axis=1, **kwargs).agg(axis=0, **kwargs)
-    return frame_base.DeferredFrame.wrap(
+      # Aggregate across all elements by first aggregating across columns,
+      # then across rows.
+      return self.agg(func, *args, **dict(kwargs, axis=1)).agg(
+          func, *args, **dict(kwargs, axis=0))
+    elif axis in (1, 'columns'):
+      # This is an easy elementwise aggregation.
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'aggregate',
+              lambda df: df.agg(func, axis=1, *args, **kwargs),
+              [self._expr],
+              requires_partition_by=partitionings.Nothing()))
+    elif len(self._expr.proxy().columns) == 0 or args or kwargs:
+      # For these corner cases, just colocate everything.
+      return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
             'aggregate',
-            lambda df: df.agg(axis=axis, **kwargs),
+            lambda df: df.agg(func, *args, **kwargs),
             [self._expr],
-            # TODO(robertwb): Sub-aggregate when possible.
             requires_partition_by=partitionings.Singleton()))
+    else:
+      # In the general case, compute the aggregation of each column separately,
+      # then recombine.
+      if not isinstance(func, dict):
+        col_names = list(self._expr.proxy().columns)
+        func = {col: func for col in col_names}
+      else:
+        col_names = list(func.keys())
+      aggregated_cols = []
+      for col in col_names:
+        funcs = func[col]
+        if not isinstance(funcs, list):
+          funcs = [funcs]
+        aggregated_cols.append(self[col].agg(funcs, *args, **kwargs))
+      # The final shape is different depending on whether any of the columns
+      # were aggregated by a list of aggregators.
+      with expressions.allow_non_parallel_operations():
+        if any(isinstance(funcs, list) for funcs in func.values()):
+          return frame_base.DeferredFrame.wrap(
+              expressions.ComputedExpression(
+                  'join_aggregate',
+                  lambda *cols: pd.DataFrame(
+                      {col: value for col, value in zip(col_names, cols)}),
+                  [col._expr for col in aggregated_cols],
+                  requires_partition_by=partitionings.Singleton()))
+        else:
+          return frame_base.DeferredFrame.wrap(
+            expressions.ComputedExpression(
+                'join_aggregate',
+                  lambda *cols: pd.Series(
+                      {col: value[0] for col, value in zip(col_names, cols)}),
+                [col._expr for col in aggregated_cols],
+                requires_partition_by=partitionings.Singleton(),
+                proxy=self._expr.proxy().agg(func, *args, **kwargs)))
 
   agg = aggregate
 
@@ -169,16 +234,27 @@ class DeferredDataFrame(frame_base.DeferredFrame):
 
   memory_usage = frame_base.wont_implement_method('non-deferred value')
 
-  all = frame_base._associative_agg_method('all')
-  any = frame_base._associative_agg_method('any')
+  all = frame_base._agg_method('all')
+  any = frame_base._agg_method('any')
 
   cummax = cummin = cumsum = cumprod = frame_base.wont_implement_method(
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
-  max = frame_base._associative_agg_method('max')
-  min = frame_base._associative_agg_method('min')
-  mode = frame_base._agg_method('mode')
+  max = frame_base._agg_method('max')
+  min = frame_base._agg_method('min')
+
+  def mode(self, axis=0, *args, **kwargs):
+    if axis == 1 or axis == 'columns':
+      raise frame_base.WontImplementError('non-deferred column values')
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'mode',
+            lambda df: df.mode(*args, **kwargs),
+            [self._expr],
+            #TODO(robertwb): Approximate?
+            requires_partition_by=partitionings.Singleton(),
+            preserves_partition_by=partitionings.Singleton()))
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
@@ -203,7 +279,7 @@ class DeferredDataFrame(frame_base.DeferredFrame):
   isna = frame_base._elementwise_method('isna')
   notnull = notna = frame_base._elementwise_method('notna')
 
-  prod = product = frame_base._associative_agg_method('prod')
+  prod = product = frame_base._agg_method('prod')
 
   @frame_base.args_to_kwargs(pd.DataFrame)
   @frame_base.populate_defaults(pd.DataFrame)
@@ -296,7 +372,7 @@ class DeferredDataFrame(frame_base.DeferredFrame):
 
   stack = frame_base._elementwise_method('stack')
 
-  sum = frame_base._associative_agg_method('sum')
+  sum = frame_base._agg_method('sum')
 
   to_records = to_dict = to_numpy = to_string = (
       frame_base.wont_implement_method('non-deferred value'))
@@ -390,6 +466,12 @@ for meth in UNLIFTABLE_AGGREGATIONS:
   setattr(DeferredGroupBy, meth, _unliftable_agg(meth))
 
 
+def _is_associative(func):
+  return func in LIFTABLE_AGGREGATIONS or (
+      getattr(func, '__name__', None) in LIFTABLE_AGGREGATIONS
+      and func.__module__ in ('numpy', 'builtins'))
+
+
 class _DeferredLoc(object):
   def __init__(self, frame):
     self._frame = frame
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 773b3ba..7d975eb 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -16,6 +16,7 @@
 
 from __future__ import absolute_import
 
+import sys
 import unittest
 
 import numpy as np
@@ -36,7 +37,7 @@ class DeferredFrameTest(unittest.TestCase):
     expected = func(*args)
     actual = expressions.Session({}).evaluate(func(*deferred_args)._expr)
     self.assertTrue(
-        expected.equals(actual),
+        getattr(expected, 'equals', expected.__eq__)(actual),
         'Expected:\n\n%r\n\nActual:\n\n%r' % (expected, actual))
 
   def test_series_arithmetic(self):
@@ -81,6 +82,26 @@ class DeferredFrameTest(unittest.TestCase):
     self._run_test(lambda df: df.loc[df.A > 10], df)
     self._run_test(lambda df: df.loc[lambda df: df.A > 10], df)
 
+  def test_series_agg(self):
+    s = pd.Series(list(range(16)))
+    self._run_test(lambda s: s.agg('sum'), s)
+    self._run_test(lambda s: s.agg(['sum']), s)
+    with beam.dataframe.allow_non_parallel_operations():
+      self._run_test(lambda s: s.agg(['sum', 'mean']), s)
+      self._run_test(lambda s: s.agg(['mean']), s)
+      self._run_test(lambda s: s.agg('mean'), s)
+
+  @unittest.skipIf(sys.version_info < (3, 6), 'Nondeterministic dict ordering.')
+  def test_dataframe_agg(self):
+    df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
+    self._run_test(lambda df: df.agg('sum'), df)
+    with beam.dataframe.allow_non_parallel_operations():
+      self._run_test(lambda df: df.agg(['sum', 'mean']), df)
+      self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df)
+      self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'mean'}), df)
+      self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df)
+      self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df)
+
 
 class AllowNonParallelTest(unittest.TestCase):
   def _use_non_parallel_operation(self):
diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py
index a270458..ed497f6 100644
--- a/sdks/python/apache_beam/dataframe/transforms.py
+++ b/sdks/python/apache_beam/dataframe/transforms.py
@@ -125,6 +125,7 @@ class _DataframeExpressionsTransform(transforms.PTransform):
         return '%s:%s' % (self.stage.ops, id(self))
 
       def expand(self, pcolls):
+
         scalar_inputs = [expr for expr in self.stage.inputs if is_scalar(expr)]
         tabular_inputs = [
             expr for expr in self.stage.inputs if not is_scalar(expr)
@@ -180,6 +181,22 @@ class _DataframeExpressionsTransform(transforms.PTransform):
         self.ops = []
         self.outputs = set()
 
+      def __repr__(self, indent=0):
+        if indent:
+          sep = '\n' + ' ' * indent
+        else:
+          sep = ''
+        return (
+            "Stage[%sinputs=%s, %spartitioning=%s, %sops=%s, %soutputs=%s]" % (
+                sep,
+                self.inputs,
+                sep,
+                self.partitioning,
+                sep,
+                self.ops,
+                sep,
+                self.outputs))
+
     # First define some helper functions.
     def output_is_partitioned_by(expr, stage, partitioning):
       if partitioning == partitionings.Nothing():
@@ -244,6 +261,11 @@ class _DataframeExpressionsTransform(transforms.PTransform):
             # It also must be declared as an output of the producing stage.
             expr_to_stage(arg).outputs.add(arg)
       stage.ops.append(expr)
+      # Ensure that any inputs for the overall transform are added
+      # in downstream stages.
+      for arg in expr.args():
+        if arg in inputs:
+          stage.inputs.add(arg)
       # This is a list as given expression may be available in many stages.
       return [stage]