You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/11/20 19:55:31 UTC

[beam] branch release-2.26.0 updated: [release-2.26.0][BEAM-11303][BEAM-11304] Fix some incorrect result bugs with size aggregation (#13386)

This is an automated email from the ASF dual-hosted git repository.

lostluck pushed a commit to branch release-2.26.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.26.0 by this push:
     new ba122ad  [release-2.26.0][BEAM-11303][BEAM-11304] Fix some incorrect result bugs with size aggregation (#13386)
ba122ad is described below

commit ba122ad63c09fb059320cab829e8d3f2636f82cf
Author: Brian Hulette <bh...@google.com>
AuthorDate: Fri Nov 20 11:54:39 2020 -0800

    [release-2.26.0][BEAM-11303][BEAM-11304] Fix some incorrect result bugs with size aggregation (#13386)
    
    * [BEAM-11303] Use sum as the post-agg for size (#13379)
    
    * Use sum to post agg size
    
    * Add implementation for count (liftable with sum post-agg)
    
    * [BEAM-11304] groupby(Series).size() on a filtered dataframe is incorrect (#13380)
---
 sdks/python/apache_beam/dataframe/frames.py      | 20 +++++++----
 sdks/python/apache_beam/dataframe/frames_test.py | 44 +++++++++++++++++++++---
 2 files changed, 54 insertions(+), 10 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index d4d0843..4c3fb68 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -145,13 +145,13 @@ class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
 
         def set_index(s, by):
           df = pd.DataFrame(s)
-          df, by = df.align(by, axis=0)
+          df, by = df.align(by, axis=0, join='inner')
           return df.set_index(by).iloc[:, 0]
 
       else:
 
         def set_index(df, by):  # type: ignore
-          df, by = df.align(by, axis=0)
+          df, by = df.align(by, axis=0, join='inner')
           return df.set_index(by)
 
       to_group = expressions.ComputedExpression(
@@ -1495,9 +1495,14 @@ class DeferredGroupBy(frame_base.DeferredFrame):
   groups = property(frame_base.wont_implement_method('non-deferred'))
 
 
-def _liftable_agg(meth):
+def _liftable_agg(meth, postagg_meth=None):
   name, func = frame_base.name_and_func(meth)
 
+  if postagg_meth is None:
+    post_agg_name, post_agg_func = name, func
+  else:
+    post_agg_name, post_agg_func = frame_base.name_and_func(postagg_meth)
+
   def wrapper(self, *args, **kwargs):
     assert isinstance(self, DeferredGroupBy)
     ungrouped = self._expr.args()[0]
@@ -1521,8 +1526,8 @@ def _liftable_agg(meth):
         preserves_partition_by=partitionings.Singleton())
 
     post_agg = expressions.ComputedExpression(
-        'post_combine_' + name,
-        lambda df: func(
+        'post_combine_' + post_agg_name,
+        lambda df: post_agg_func(
             df.groupby(level=list(range(df.index.nlevels)), **groupby_kwargs),
             **kwargs),
         [pre_agg],
@@ -1561,11 +1566,14 @@ def _unliftable_agg(meth):
 
   return wrapper
 
-LIFTABLE_AGGREGATIONS = ['all', 'any', 'max', 'min', 'prod', 'size', 'sum']
+LIFTABLE_AGGREGATIONS = ['all', 'any', 'max', 'min', 'prod', 'sum']
+LIFTABLE_WITH_SUM_AGGREGATIONS = ['size', 'count']
 UNLIFTABLE_AGGREGATIONS = ['mean', 'median', 'std', 'var']
 
 for meth in LIFTABLE_AGGREGATIONS:
   setattr(DeferredGroupBy, meth, _liftable_agg(meth))
+for meth in LIFTABLE_WITH_SUM_AGGREGATIONS:
+  setattr(DeferredGroupBy, meth, _liftable_agg(meth, postagg_meth='sum'))
 for meth in UNLIFTABLE_AGGREGATIONS:
   setattr(DeferredGroupBy, meth, _unliftable_agg(meth))
 
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 4a5fed2..ef9281d 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -89,10 +89,46 @@ class DeferredFrameTest(unittest.TestCase):
     self._run_test(new_column, df)
 
   def test_groupby(self):
-    df = pd.DataFrame({'group': ['a', 'a', 'a', 'b'], 'value': [1, 2, 3, 5]})
-    self._run_test(lambda df: df.groupby('group').agg(sum), df)
-    self._run_test(lambda df: df.groupby('group').sum(), df)
-    self._run_test(lambda df: df.groupby('group').median(), df)
+    df = pd.DataFrame({
+        'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],
+        'value': [None if i % 11 == 0 else i for i in range(100)]
+    })
+    self._run_test(
+        lambda df: df.groupby('group').agg(sum), df, distributed=True)
+    self._run_test(lambda df: df.groupby('group').sum(), df, distributed=True)
+    self._run_test(
+        lambda df: df.groupby('group').median(), df, distributed=True)
+    self._run_test(lambda df: df.groupby('group').size(), df, distributed=True)
+    self._run_test(lambda df: df.groupby('group').count(), df, distributed=True)
+    self._run_test(lambda df: df.groupby('group').max(), df, distributed=True)
+    self._run_test(lambda df: df.groupby('group').min(), df, distributed=True)
+    self._run_test(lambda df: df.groupby('group').mean(), df, distributed=True)
+
+    self._run_test(
+        lambda df: df[df.value > 30].groupby('group').sum(),
+        df,
+        distributed=True)
+    self._run_test(
+        lambda df: df[df.value > 30].groupby('group').mean(),
+        df,
+        distributed=True)
+    self._run_test(
+        lambda df: df[df.value > 30].groupby('group').size(),
+        df,
+        distributed=True)
+
+    self._run_test(
+        lambda df: df[df.value > 40].groupby(df.group).sum(),
+        df,
+        distributed=True)
+    self._run_test(
+        lambda df: df[df.value > 40].groupby(df.group).mean(),
+        df,
+        distributed=True)
+    self._run_test(
+        lambda df: df[df.value > 40].groupby(df.group).size(),
+        df,
+        distributed=True)
 
   @unittest.skipIf(sys.version_info <= (3, ), 'differing signature')
   def test_merge(self):