You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2022/07/10 15:50:58 UTC

[beam] branch master updated: Parallelizable DataFrame/Series mean (#22174)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 262f2b7f91a Parallelizable DataFrame/Series mean (#22174)
262f2b7f91a is described below

commit 262f2b7f91ac879cb8921a3e7d59d0315c9df9c4
Author: Brian Hulette <bh...@google.com>
AuthorDate: Sun Jul 10 08:50:52 2022 -0700

    Parallelizable DataFrame/Series mean (#22174)
---
 sdks/python/apache_beam/dataframe/frames.py      | 13 +++++-
 sdks/python/apache_beam/dataframe/frames_test.py | 50 ++++++++++++------------
 2 files changed, 38 insertions(+), 25 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 4f47efdad06..88557edf752 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -136,6 +136,7 @@ HAND_IMPLEMENTED_GLOBAL_AGGREGATIONS = {
     'quantile',
     'std',
     'var',
+    'mean',
     'nunique',
     'corr',
     'cov',
@@ -1577,6 +1578,17 @@ class DeferredSeries(DeferredDataFrameOrSeries):
     # Compute variance (deferred scalar) with same args, then sqrt it
     return self.var(*args, **kwargs).apply(lambda var: math.sqrt(var))
 
+  @frame_base.with_docs_from(pd.Series)
+  @frame_base.args_to_kwargs(pd.Series)
+  @frame_base.populate_defaults(pd.Series)
+  def mean(self, skipna, **kwargs):
+    if skipna:
+      size = self.count()
+    else:
+      size = self.length()
+
+    return self.sum(skipna=skipna, **kwargs) / size
+
   @frame_base.with_docs_from(pd.Series)
   @frame_base.args_to_kwargs(pd.Series)
   @frame_base.populate_defaults(pd.Series)
@@ -2068,7 +2080,6 @@ class DeferredSeries(DeferredDataFrameOrSeries):
   max = _agg_method(pd.Series, 'max')
   prod = product = _agg_method(pd.Series, 'prod')
   sum = _agg_method(pd.Series, 'sum')
-  mean = _agg_method(pd.Series, 'mean')
   median = _agg_method(pd.Series, 'median')
   sem = _agg_method(pd.Series, 'sem')
   mad = _agg_method(pd.Series, 'mad')
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index f3ce6b402d3..693ceb659b4 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -1924,8 +1924,7 @@ class AggregationTest(_AbstractFrameTest):
   def test_series_agg(self, agg_method):
     s = pd.Series(list(range(16)))
 
-    nonparallel = agg_method in (
-        'quantile', 'mean', 'describe', 'median', 'sem', 'mad')
+    nonparallel = agg_method in ('quantile', 'describe', 'median', 'sem', 'mad')
 
     # TODO(https://github.com/apache/beam/issues/20926): max and min produce
     # the wrong proxy
@@ -1944,8 +1943,7 @@ class AggregationTest(_AbstractFrameTest):
   def test_series_agg_method(self, agg_method):
     s = pd.Series(list(range(16)))
 
-    nonparallel = agg_method in (
-        'quantile', 'mean', 'describe', 'median', 'sem', 'mad')
+    nonparallel = agg_method in ('quantile', 'describe', 'median', 'sem', 'mad')
 
     # TODO(https://github.com/apache/beam/issues/20926): max and min produce
     # the wrong proxy
@@ -1961,8 +1959,7 @@ class AggregationTest(_AbstractFrameTest):
   def test_dataframe_agg(self, agg_method):
     df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
 
-    nonparallel = agg_method in (
-        'quantile', 'mean', 'describe', 'median', 'sem', 'mad')
+    nonparallel = agg_method in ('quantile', 'describe', 'median', 'sem', 'mad')
 
     # TODO(https://github.com/apache/beam/issues/20926): max and min produce
     # the wrong proxy
@@ -1979,8 +1976,7 @@ class AggregationTest(_AbstractFrameTest):
   def test_dataframe_agg_method(self, agg_method):
     df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
 
-    nonparallel = agg_method in (
-        'quantile', 'mean', 'describe', 'median', 'sem', 'mad')
+    nonparallel = agg_method in ('quantile', 'describe', 'median', 'sem', 'mad')
 
     # TODO(https://github.com/apache/beam/issues/20926): max and min produce
     # the wrong proxy
@@ -1996,27 +1992,18 @@ class AggregationTest(_AbstractFrameTest):
     s = pd.Series(list(range(16)))
     self._run_test(lambda s: s.agg('sum'), s)
     self._run_test(lambda s: s.agg(['sum']), s)
-    self._run_test(lambda s: s.agg(['sum', 'mean']), s, nonparallel=True)
-    self._run_test(lambda s: s.agg(['mean']), s, nonparallel=True)
-    self._run_test(lambda s: s.agg('mean'), s, nonparallel=True)
+    self._run_test(lambda s: s.agg(['sum', 'mean']), s)
+    self._run_test(lambda s: s.agg(['mean']), s)
+    self._run_test(lambda s: s.agg('mean'), s)
 
   def test_dataframe_agg_modes(self):
     df = pd.DataFrame({'A': [1, 2, 3, 4], 'B': [2, 3, 5, 7]})
     self._run_test(lambda df: df.agg('sum'), df)
-    self._run_test(lambda df: df.agg(['sum', 'mean']), df, nonparallel=True)
+    self._run_test(lambda df: df.agg(['sum', 'mean']), df)
     self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'sum'}), df)
-    self._run_test(
-        lambda df: df.agg({
-            'A': 'sum', 'B': 'mean'
-        }), df, nonparallel=True)
-    self._run_test(
-        lambda df: df.agg({'A': ['sum', 'mean']}), df, nonparallel=True)
-    self._run_test(
-        lambda df: df.agg({
-            'A': ['sum', 'mean'], 'B': 'min'
-        }),
-        df,
-        nonparallel=True)
+    self._run_test(lambda df: df.agg({'A': 'sum', 'B': 'mean'}), df)
+    self._run_test(lambda df: df.agg({'A': ['sum', 'mean']}), df)
+    self._run_test(lambda df: df.agg({'A': ['sum', 'mean'], 'B': 'min'}), df)
 
   def test_series_agg_level(self):
     self._run_test(
@@ -2090,6 +2077,21 @@ class AggregationTest(_AbstractFrameTest):
                                                           level=0),
         GROUPBY_DF)
 
+  def test_series_mean_skipna(self):
+    df = pd.DataFrame({
+        'one': [i if i % 8 == 0 else np.nan for i in range(8)],
+        'two': [i if i % 4 == 0 else np.nan for i in range(8)],
+        'three': [i if i % 2 == 0 else np.nan for i in range(8)],
+    })
+
+    self._run_test(lambda df: df.one.mean(skipna=False), df)
+    self._run_test(lambda df: df.two.mean(skipna=False), df)
+    self._run_test(lambda df: df.three.mean(skipna=False), df)
+
+    self._run_test(lambda df: df.one.mean(skipna=True), df)
+    self._run_test(lambda df: df.two.mean(skipna=True), df)
+    self._run_test(lambda df: df.three.mean(skipna=True), df)
+
   def test_dataframe_agg_multifunc_level(self):
     # level= is ignored for multiple agg fns
     self._run_test(