You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2021/10/15 21:54:07 UTC

[beam] branch master updated: [BEAM-12564] Implement Series.hasnans (#15729)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 72b861d  [BEAM-12564] Implement Series.hasnans (#15729)
72b861d is described below

commit 72b861dab90126f3a61785a259d514d1e969973d
Author: Benjamin Gonzalez <74...@users.noreply.github.com>
AuthorDate: Fri Oct 15 16:53:08 2021 -0500

    [BEAM-12564] Implement Series.hasnans (#15729)
    
    * [BEAM-12564] Initial implementation hasnans
    
    * [BEAM-12564] Add Series.hasnans to DataFrame API
    
    * [BEAM-12564] Fix mypy warnings
    
    * [BEAM-12564] Renaming and fix combine function at Series.hasnas
---
 sdks/python/apache_beam/dataframe/frames.py      | 17 +++++++++++++++++
 sdks/python/apache_beam/dataframe/frames_test.py |  9 +++++++++
 2 files changed, 26 insertions(+)

diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index bc8a888..e0263ca 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -1100,6 +1100,23 @@ class DeferredSeries(DeferredDataFrameOrSeries):
 
   @property  # type: ignore
   @frame_base.with_docs_from(pd.Series)
+  def hasnans(self):
+    has_nans = expressions.ComputedExpression(
+        'hasnans',
+        lambda s: pd.Series(s.hasnans), [self._expr],
+        requires_partition_by=partitionings.Arbitrary(),
+        preserves_partition_by=partitionings.Singleton())
+
+    with expressions.allow_non_parallel_operations():
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'combine_hasnans',
+              lambda s: s.any(), [has_nans],
+              requires_partition_by=partitionings.Singleton(),
+              preserves_partition_by=partitionings.Singleton()))
+
+  @property  # type: ignore
+  @frame_base.with_docs_from(pd.Series)
   def dtype(self):
     return self._expr.proxy().dtype
 
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index a2703d8..2c5ee86 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -661,6 +661,15 @@ class DeferredFrameTest(_AbstractFrameTest):
   def test_series_is_unique(self, series):
     self._run_test(lambda s: s.is_unique, series)
 
+  @parameterized.expand([
+      (pd.Series(range(10)), ),  # False
+      (pd.Series([1, 2, np.nan, 3, np.nan]), ),  # True
+      (pd.Series(['a', 'b', 'c', 'd', 'e']), ),  # False
+      (pd.Series(['a', 'b', None, 'c', None]), ),  # True
+  ])
+  def test_series_hasnans(self, series):
+    self._run_test(lambda s: s.hasnans, series)
+
   def test_dataframe_getitem(self):
     df = pd.DataFrame({'A': [x**2 for x in range(6)], 'B': list('abcdef')})
     self._run_test(lambda df: df['A'], df)