You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lo...@apache.org on 2020/11/20 19:55:31 UTC
[beam] branch release-2.26.0 updated:
[release-2.26.0][BEAM-11303][BEAM-11304] Fix some incorrect result bugs
with size aggregation (#13386)
This is an automated email from the ASF dual-hosted git repository.
lostluck pushed a commit to branch release-2.26.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.26.0 by this push:
new ba122ad [release-2.26.0][BEAM-11303][BEAM-11304] Fix some incorrect result bugs with size aggregation (#13386)
ba122ad is described below
commit ba122ad63c09fb059320cab829e8d3f2636f82cf
Author: Brian Hulette <bh...@google.com>
AuthorDate: Fri Nov 20 11:54:39 2020 -0800
[release-2.26.0][BEAM-11303][BEAM-11304] Fix some incorrect result bugs with size aggregation (#13386)
* [BEAM-11303] Use sum as the post-agg for size (#13379)
* Use sum to post agg size
* Add implementation for count (liftable with sum post-agg)
* [BEAM-11304] groupby(Series).size() on a filtered dataframe is incorrect (#13380)
---
sdks/python/apache_beam/dataframe/frames.py | 20 +++++++----
sdks/python/apache_beam/dataframe/frames_test.py | 44 +++++++++++++++++++++---
2 files changed, 54 insertions(+), 10 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index d4d0843..4c3fb68 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -145,13 +145,13 @@ class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
def set_index(s, by):
df = pd.DataFrame(s)
- df, by = df.align(by, axis=0)
+ df, by = df.align(by, axis=0, join='inner')
return df.set_index(by).iloc[:, 0]
else:
def set_index(df, by): # type: ignore
- df, by = df.align(by, axis=0)
+ df, by = df.align(by, axis=0, join='inner')
return df.set_index(by)
to_group = expressions.ComputedExpression(
@@ -1495,9 +1495,14 @@ class DeferredGroupBy(frame_base.DeferredFrame):
groups = property(frame_base.wont_implement_method('non-deferred'))
-def _liftable_agg(meth):
+def _liftable_agg(meth, postagg_meth=None):
name, func = frame_base.name_and_func(meth)
+ if postagg_meth is None:
+ post_agg_name, post_agg_func = name, func
+ else:
+ post_agg_name, post_agg_func = frame_base.name_and_func(postagg_meth)
+
def wrapper(self, *args, **kwargs):
assert isinstance(self, DeferredGroupBy)
ungrouped = self._expr.args()[0]
@@ -1521,8 +1526,8 @@ def _liftable_agg(meth):
preserves_partition_by=partitionings.Singleton())
post_agg = expressions.ComputedExpression(
- 'post_combine_' + name,
- lambda df: func(
+ 'post_combine_' + post_agg_name,
+ lambda df: post_agg_func(
df.groupby(level=list(range(df.index.nlevels)), **groupby_kwargs),
**kwargs),
[pre_agg],
@@ -1561,11 +1566,14 @@ def _unliftable_agg(meth):
return wrapper
-LIFTABLE_AGGREGATIONS = ['all', 'any', 'max', 'min', 'prod', 'size', 'sum']
+LIFTABLE_AGGREGATIONS = ['all', 'any', 'max', 'min', 'prod', 'sum']
+LIFTABLE_WITH_SUM_AGGREGATIONS = ['size', 'count']
UNLIFTABLE_AGGREGATIONS = ['mean', 'median', 'std', 'var']
for meth in LIFTABLE_AGGREGATIONS:
setattr(DeferredGroupBy, meth, _liftable_agg(meth))
+for meth in LIFTABLE_WITH_SUM_AGGREGATIONS:
+ setattr(DeferredGroupBy, meth, _liftable_agg(meth, postagg_meth='sum'))
for meth in UNLIFTABLE_AGGREGATIONS:
setattr(DeferredGroupBy, meth, _unliftable_agg(meth))
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index 4a5fed2..ef9281d 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -89,10 +89,46 @@ class DeferredFrameTest(unittest.TestCase):
self._run_test(new_column, df)
def test_groupby(self):
- df = pd.DataFrame({'group': ['a', 'a', 'a', 'b'], 'value': [1, 2, 3, 5]})
- self._run_test(lambda df: df.groupby('group').agg(sum), df)
- self._run_test(lambda df: df.groupby('group').sum(), df)
- self._run_test(lambda df: df.groupby('group').median(), df)
+ df = pd.DataFrame({
+ 'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],
+ 'value': [None if i % 11 == 0 else i for i in range(100)]
+ })
+ self._run_test(
+ lambda df: df.groupby('group').agg(sum), df, distributed=True)
+ self._run_test(lambda df: df.groupby('group').sum(), df, distributed=True)
+ self._run_test(
+ lambda df: df.groupby('group').median(), df, distributed=True)
+ self._run_test(lambda df: df.groupby('group').size(), df, distributed=True)
+ self._run_test(lambda df: df.groupby('group').count(), df, distributed=True)
+ self._run_test(lambda df: df.groupby('group').max(), df, distributed=True)
+ self._run_test(lambda df: df.groupby('group').min(), df, distributed=True)
+ self._run_test(lambda df: df.groupby('group').mean(), df, distributed=True)
+
+ self._run_test(
+ lambda df: df[df.value > 30].groupby('group').sum(),
+ df,
+ distributed=True)
+ self._run_test(
+ lambda df: df[df.value > 30].groupby('group').mean(),
+ df,
+ distributed=True)
+ self._run_test(
+ lambda df: df[df.value > 30].groupby('group').size(),
+ df,
+ distributed=True)
+
+ self._run_test(
+ lambda df: df[df.value > 40].groupby(df.group).sum(),
+ df,
+ distributed=True)
+ self._run_test(
+ lambda df: df[df.value > 40].groupby(df.group).mean(),
+ df,
+ distributed=True)
+ self._run_test(
+ lambda df: df[df.value > 40].groupby(df.group).size(),
+ df,
+ distributed=True)
@unittest.skipIf(sys.version_info <= (3, ), 'differing signature')
def test_merge(self):