You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/11/24 23:04:22 UTC

[beam] branch release-2.26.0 updated: [release-2.26.0][BEAM-11324] Fix some partitioning errors that can lead to execution-time errors (#13418)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch release-2.26.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.26.0 by this push:
     new 3efb955  [release-2.26.0][BEAM-11324] Fix some partitioning errors that can lead to execution-time errors (#13418)
3efb955 is described below

commit 3efb95516f5943283aae2ca122e20936fe5f4eae
Author: Brian Hulette <bh...@google.com>
AuthorDate: Tue Nov 24 15:03:37 2020 -0800

    [release-2.26.0][BEAM-11324] Fix some partitioning errors that can lead to execution-time errors (#13418)
    
    * set_index does not preserve partitioning (#13398)
    
    * [BEAM-11324] Add additional verification in PartitioningSession (#13401)
    
    * Default to distributed=True in frames_test.py:
    
    * Add additional partitioning verification to PartitioningSession
    
    * Series pre-agg doesn't preserve partitioning
    
    * reset_index doesn't preserve partitioning
    
    * remove whitespace
    
    * store and recover random state, clarify loop
    
    * add TODO
    
    * map_index doesn't preserve partitioning
    
    * lint
    
    * lint
    
    * lint
    
    * Disallow grouping by a series
    
    * lint
    
    * lint
    
    * import order
    
    * fix whitespace merge error
---
 sdks/python/apache_beam/dataframe/expressions.py   |  92 ++++++++++++-----
 sdks/python/apache_beam/dataframe/frames.py        |  32 ++----
 sdks/python/apache_beam/dataframe/frames_test.py   | 115 ++++++++++-----------
 .../apache_beam/dataframe/pandas_doctests_test.py  |   2 +
 sdks/python/apache_beam/dataframe/partitionings.py |  26 +++++
 5 files changed, 155 insertions(+), 112 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/expressions.py b/sdks/python/apache_beam/dataframe/expressions.py
index 8697eaf..e275795 100644
--- a/sdks/python/apache_beam/dataframe/expressions.py
+++ b/sdks/python/apache_beam/dataframe/expressions.py
@@ -17,6 +17,7 @@
 from __future__ import absolute_import
 
 import contextlib
+import random
 import threading
 from typing import Any
 from typing import Callable
@@ -48,10 +49,15 @@ class Session(object):
 class PartitioningSession(Session):
   """An extension of Session that enforces actual partitioning of inputs.
 
-  When evaluating an expression, inputs are partitioned according to its
-  `requires_partition_by` specifications, the expression is evaluated on each
-  partition separately, and the final result concatinated, as if this were
-  actually executed in a parallel manner.
+  Each expression is evaluated multiple times for various supported
+  partitionings determined by its `requires_partition_by` specification. For
+  each tested partitioning, the input is partitioned and the expression is
+  evaluated on each partition separately, as if this were actually executed in
+  a parallel manner.
+
+  For each input partitioning, the results are verified to be partitioned
+  appropriately according to the expression's `preserves_partition_by`
+  specification.
 
   For testing only.
   """
@@ -67,28 +73,62 @@ class PartitioningSession(Session):
         result = super(PartitioningSession, self).evaluate(expr)
       else:
         scaler_args = [arg for arg in expr.args() if is_scalar(arg)]
-        parts = collections.defaultdict(
-            lambda: Session({arg: self.evaluate(arg)
-                             for arg in scaler_args}))
-        for arg in expr.args():
-          if not is_scalar(arg):
-            input = self.evaluate(arg)
-            for key, part in expr.requires_partition_by().test_partition_fn(
-                input):
-              parts[key]._bindings[arg] = part
-        if not parts:
-          parts[None]  # Create at least one entry.
-
-        results = []
-        for session in parts.values():
-          if any(len(session.lookup(arg)) for arg in expr.args()
-                 if not is_scalar(arg)):
-            results.append(session.evaluate(expr))
-        if results:
-          result = pd.concat(results)
-        else:
-          # Choose any single session.
-          result = next(iter(parts.values())).evaluate(expr)
+
+        def evaluate_with(input_partitioning):
+          parts = collections.defaultdict(
+              lambda: Session({arg: self.evaluate(arg)
+                               for arg in scaler_args}))
+          for arg in expr.args():
+            if not is_scalar(arg):
+              input = self.evaluate(arg)
+              for key, part in input_partitioning.test_partition_fn(input):
+                parts[key]._bindings[arg] = part
+          if not parts:
+            parts[None]  # Create at least one entry.
+
+          results = []
+          for session in parts.values():
+            if any(len(session.lookup(arg)) for arg in expr.args()
+                   if not is_scalar(arg)):
+              results.append(session.evaluate(expr))
+
+          expected_output_partitioning = expr.preserves_partition_by(
+          ) if input_partitioning.is_subpartitioning_of(
+              expr.preserves_partition_by()) else input_partitioning
+
+          if not expected_output_partitioning.check(results):
+            raise AssertionError(
+                f"""Expression does not preserve partitioning!
+                Expression: {expr}
+                Requires: {expr.requires_partition_by()}
+                Preserves: {expr.preserves_partition_by()}
+                Input partitioning: {input_partitioning}
+                Expected output partitioning: {expected_output_partitioning}
+                """)
+
+          if results:
+            return pd.concat(results)
+          else:
+            # Choose any single session.
+            return next(iter(parts.values())).evaluate(expr)
+
+        # Store random state so it can be re-used for each execution, in case
+        # the expression is part of a test that relies on the random seed.
+        random_state = random.getstate()
+
+        for input_partitioning in set([expr.requires_partition_by(),
+                                       partitionings.Nothing(),
+                                       partitionings.Index(),
+                                       partitionings.Singleton()]):
+          if not input_partitioning.is_subpartitioning_of(
+              expr.requires_partition_by()):
+            continue
+
+          random.setstate(random_state)
+
+          # TODO(BEAM-11324): Consider verifying result is always the same
+          result = evaluate_with(input_partitioning)
+
         self._bindings[expr] = result
     return self._bindings[expr]
 
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 4c3fb68..f27533f 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -137,29 +137,13 @@ class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
           'map_index',
           map_index, [self._expr],
           requires_partition_by=partitionings.Nothing(),
-          preserves_partition_by=partitionings.Singleton())
+          preserves_partition_by=partitionings.Nothing())
 
     elif isinstance(by, DeferredSeries):
 
-      if isinstance(self, DeferredSeries):
-
-        def set_index(s, by):
-          df = pd.DataFrame(s)
-          df, by = df.align(by, axis=0, join='inner')
-          return df.set_index(by).iloc[:, 0]
-
-      else:
-
-        def set_index(df, by):  # type: ignore
-          df, by = df.align(by, axis=0, join='inner')
-          return df.set_index(by)
-
-      to_group = expressions.ComputedExpression(
-          'set_index',
-          set_index,  #
-          [self._expr, by._expr],
-          requires_partition_by=partitionings.Index(),
-          preserves_partition_by=partitionings.Singleton())
+      raise NotImplementedError(
+          "grouping by a Series is not yet implemented. You can group by a "
+          "DataFrame column by specifying its name.")
 
     elif isinstance(by, np.ndarray):
       raise frame_base.WontImplementError('order sensitive')
@@ -467,9 +451,11 @@ class DeferredSeries(DeferredDataFrameOrSeries):
       # We're only handling a single column.
       base_func = func[0] if isinstance(func, list) else func
       if _is_associative(base_func) and not args and not kwargs:
-        intermediate = expressions.elementwise_expression(
+        intermediate = expressions.ComputedExpression(
             'pre_aggregate',
-            lambda s: s.agg([base_func], *args, **kwargs), [self._expr])
+            lambda s: s.agg([base_func], *args, **kwargs), [self._expr],
+            requires_partition_by=partitionings.Nothing(),
+            preserves_partition_by=partitionings.Nothing())
         allow_nonparallel_final = True
       else:
         intermediate = self._expr
@@ -1377,7 +1363,7 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
             'reset_index',
             lambda df: df.reset_index(level=level, **kwargs),
             [self._expr],
-            preserves_partition_by=partitionings.Singleton(),
+            preserves_partition_by=partitionings.Nothing(),
             requires_partition_by=requires_partition_by))
 
   round = frame_base._elementwise_method('round')
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index ef9281d..79a38230 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -30,7 +30,7 @@ from apache_beam.dataframe import frames  # pylint: disable=unused-import
 
 
 class DeferredFrameTest(unittest.TestCase):
-  def _run_test(self, func, *args, distributed=False):
+  def _run_test(self, func, *args, distributed=True):
     deferred_args = [
         frame_base.DeferredFrame.wrap(
             expressions.ConstantExpression(arg, arg[0:0])) for arg in args
@@ -93,42 +93,36 @@ class DeferredFrameTest(unittest.TestCase):
         'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],
         'value': [None if i % 11 == 0 else i for i in range(100)]
     })
-    self._run_test(
-        lambda df: df.groupby('group').agg(sum), df, distributed=True)
-    self._run_test(lambda df: df.groupby('group').sum(), df, distributed=True)
-    self._run_test(
-        lambda df: df.groupby('group').median(), df, distributed=True)
-    self._run_test(lambda df: df.groupby('group').size(), df, distributed=True)
-    self._run_test(lambda df: df.groupby('group').count(), df, distributed=True)
-    self._run_test(lambda df: df.groupby('group').max(), df, distributed=True)
-    self._run_test(lambda df: df.groupby('group').min(), df, distributed=True)
-    self._run_test(lambda df: df.groupby('group').mean(), df, distributed=True)
+    self._run_test(lambda df: df.groupby('group').agg(sum), df)
+    self._run_test(lambda df: df.groupby('group').sum(), df)
+    self._run_test(lambda df: df.groupby('group').median(), df)
+    self._run_test(lambda df: df.groupby('group').size(), df)
+    self._run_test(lambda df: df.groupby('group').count(), df)
+    self._run_test(lambda df: df.groupby('group').max(), df)
+    self._run_test(lambda df: df.groupby('group').min(), df)
+    self._run_test(lambda df: df.groupby('group').mean(), df)
+
+    self._run_test(lambda df: df[df.value > 30].groupby('group').sum(), df)
+    self._run_test(lambda df: df[df.value > 30].groupby('group').mean(), df)
+    self._run_test(lambda df: df[df.value > 30].groupby('group').size(), df)
+
+    # Grouping by a series is not currently supported
+    #self._run_test(lambda df: df[df.value > 40].groupby(df.group).sum(), df)
+    #self._run_test(lambda df: df[df.value > 40].groupby(df.group).mean(), df)
+    #self._run_test(lambda df: df[df.value > 40].groupby(df.group).size(), df)
+
+    # Example from https://pandas.pydata.org/docs/user_guide/groupby.html
+    arrays = [['bar', 'bar', 'baz', 'baz', 'foo', 'foo', 'qux', 'qux'],
+              ['one', 'two', 'one', 'two', 'one', 'two', 'one', 'two']]
+
+    index = pd.MultiIndex.from_arrays(arrays, names=['first', 'second'])
 
-    self._run_test(
-        lambda df: df[df.value > 30].groupby('group').sum(),
-        df,
-        distributed=True)
-    self._run_test(
-        lambda df: df[df.value > 30].groupby('group').mean(),
-        df,
-        distributed=True)
-    self._run_test(
-        lambda df: df[df.value > 30].groupby('group').size(),
-        df,
-        distributed=True)
+    df = pd.DataFrame({
+        'A': [1, 1, 1, 1, 2, 2, 3, 3], 'B': np.arange(8)
+    },
+                      index=index)
 
-    self._run_test(
-        lambda df: df[df.value > 40].groupby(df.group).sum(),
-        df,
-        distributed=True)
-    self._run_test(
-        lambda df: df[df.value > 40].groupby(df.group).mean(),
-        df,
-        distributed=True)
-    self._run_test(
-        lambda df: df[df.value > 40].groupby(df.group).size(),
-        df,
-        distributed=True)
+    self._run_test(lambda df: df.groupby(['second', 'A']).sum(), df)
 
   @unittest.skipIf(sys.version_info <= (3, ), 'differing signature')
   def test_merge(self):
@@ -160,24 +154,24 @@ class DeferredFrameTest(unittest.TestCase):
 
   def test_series_getitem(self):
     s = pd.Series([x**2 for x in range(10)])
-    self._run_test(lambda s: s[...], s, distributed=True)
-    self._run_test(lambda s: s[:], s, distributed=True)
-    self._run_test(lambda s: s[s < 10], s, distributed=True)
-    self._run_test(lambda s: s[lambda s: s < 10], s, distributed=True)
+    self._run_test(lambda s: s[...], s)
+    self._run_test(lambda s: s[:], s)
+    self._run_test(lambda s: s[s < 10], s)
+    self._run_test(lambda s: s[lambda s: s < 10], s)
 
     s.index = s.index.map(float)
-    self._run_test(lambda s: s[1.5:6], s, distributed=True)
+    self._run_test(lambda s: s[1.5:6], s)
 
   def test_dataframe_getitem(self):
     df = pd.DataFrame({'A': [x**2 for x in range(6)], 'B': list('abcdef')})
-    self._run_test(lambda df: df['A'], df, distributed=True)
-    self._run_test(lambda df: df[['A', 'B']], df, distributed=True)
+    self._run_test(lambda df: df['A'], df)
+    self._run_test(lambda df: df[['A', 'B']], df)
 
-    self._run_test(lambda df: df[:], df, distributed=True)
-    self._run_test(lambda df: df[df.A < 10], df, distributed=True)
+    self._run_test(lambda df: df[:], df)
+    self._run_test(lambda df: df[df.A < 10], df)
 
     df.index = df.index.map(float)
-    self._run_test(lambda df: df[1.5:4], df, distributed=True)
+    self._run_test(lambda df: df[1.5:4], df)
 
   def test_loc(self):
     dates = pd.date_range('1/1/2000', periods=8)
@@ -222,27 +216,23 @@ class DeferredFrameTest(unittest.TestCase):
     for s in [pd.Series([1, 2, 3]),
               pd.Series(range(100)),
               pd.Series([x**3 for x in range(-50, 50)])]:
-      self._run_test(lambda s: s.std(), s, distributed=True)
-      self._run_test(lambda s: s.corr(s), s, distributed=True)
-      self._run_test(lambda s: s.corr(s + 1), s, distributed=True)
-      self._run_test(lambda s: s.corr(s * s), s, distributed=True)
-      self._run_test(lambda s: s.cov(s * s), s, distributed=True)
+      self._run_test(lambda s: s.std(), s)
+      self._run_test(lambda s: s.corr(s), s)
+      self._run_test(lambda s: s.corr(s + 1), s)
+      self._run_test(lambda s: s.corr(s * s), s)
+      self._run_test(lambda s: s.cov(s * s), s)
 
   def test_dataframe_cov_corr(self):
     df = pd.DataFrame(np.random.randn(20, 3), columns=['a', 'b', 'c'])
     df.loc[df.index[:5], 'a'] = np.nan
     df.loc[df.index[5:10], 'b'] = np.nan
-    self._run_test(lambda df: df.corr().round(8), df, distributed=True)
-    self._run_test(lambda df: df.cov().round(8), df, distributed=True)
+    self._run_test(lambda df: df.corr().round(8), df)
+    self._run_test(lambda df: df.cov().round(8), df)
+    self._run_test(lambda df: df.corr(min_periods=12).round(8), df)
+    self._run_test(lambda df: df.cov(min_periods=12).round(8), df)
+    self._run_test(lambda df: df.corrwith(df.a).round(8), df)
     self._run_test(
-        lambda df: df.corr(min_periods=12).round(8), df, distributed=True)
-    self._run_test(
-        lambda df: df.cov(min_periods=12).round(8), df, distributed=True)
-    self._run_test(lambda df: df.corrwith(df.a).round(8), df, distributed=True)
-    self._run_test(
-        lambda df: df[['a', 'b']].corrwith(df[['b', 'c']]).round(8),
-        df,
-        distributed=True)
+        lambda df: df[['a', 'b']].corrwith(df[['b', 'c']]).round(8), df)
 
   def test_categorical_groupby(self):
     df = pd.DataFrame({'A': np.arange(6), 'B': list('aabbca')})
@@ -251,9 +241,8 @@ class DeferredFrameTest(unittest.TestCase):
     # TODO(BEAM-11190): These aggregations can be done in index partitions, but
     # it will require a little more complex logic
     with beam.dataframe.allow_non_parallel_operations():
-      self._run_test(lambda df: df.groupby(level=0).sum(), df, distributed=True)
-      self._run_test(
-          lambda df: df.groupby(level=0).mean(), df, distributed=True)
+      self._run_test(lambda df: df.groupby(level=0).sum(), df)
+      self._run_test(lambda df: df.groupby(level=0).mean(), df)
 
 
 class AllowNonParallelTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index a40f614..9afc21a 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -237,6 +237,8 @@ class DoctestTest(unittest.TestCase):
                 'ser.groupby(["a", "b", "a", "b"]).mean()',
                 'ser.groupby(["a", "b", "a", np.nan]).mean()',
                 'ser.groupby(["a", "b", "a", np.nan], dropna=False).mean()',
+                # Grouping by a series is not supported
+                'ser.groupby(ser > 100).mean()',
             ],
             'pandas.core.series.Series.reindex': ['*'],
         },
diff --git a/sdks/python/apache_beam/dataframe/partitionings.py b/sdks/python/apache_beam/dataframe/partitionings.py
index 53454b1..9baf9c9 100644
--- a/sdks/python/apache_beam/dataframe/partitionings.py
+++ b/sdks/python/apache_beam/dataframe/partitionings.py
@@ -115,6 +115,26 @@ class Index(Partitioning):
     for key in range(num_partitions):
       yield key, df[hashes % num_partitions == key]
 
+  def check(self, dfs):
+    # TODO(BEAM-11324): This check should be stronger, it should verify that
+    # running partition_fn on the concatenation of dfs yields the same
+    # partitions.
+    if self._levels is None:
+
+      def get_index_set(df):
+        return set(df.index)
+    else:
+
+      def get_index_set(df):
+        return set(zip(df.index.level[level] for level in self._levels))
+
+    index_sets = [get_index_set(df) for df in dfs]
+    for i, index_set in enumerate(index_sets[:-1]):
+      if not index_set.isdisjoint(set.union(*index_sets[i + 1:])):
+        return False
+
+    return True
+
 
 class Singleton(Partitioning):
   """A partitioning of all the data into a single partition.
@@ -134,6 +154,9 @@ class Singleton(Partitioning):
   def partition_fn(self, df, num_partitions):
     yield None, df
 
+  def check(self, dfs):
+    return len(dfs) <= 1
+
 
 class Nothing(Partitioning):
   """A partitioning imposing no constraints on the actual partitioning.
@@ -162,3 +185,6 @@ class Nothing(Partitioning):
     part = pd.Series(shuffled(range(len(df))), index=df.index) % num_partitions
     for k in range(num_partitions):
       yield k, df[part == k]
+
+  def check(self, dfs):
+    return True