You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2021/08/26 03:43:09 UTC

[beam] 01/01: Revert "[BEAM-12764] Revert "Merge pull request #15165 from [BEAM-12593] Veri…"

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch revert-15385-rollback-dataframes
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 0766b5c05cab37f3b77af625507a7ffb4835ce53
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Wed Aug 25 20:42:24 2021 -0700

    Revert "[BEAM-12764] Revert "Merge pull request #15165 from [BEAM-12593] Veri…"
---
 sdks/python/apache_beam/dataframe/frames.py        | 21 +++++---
 sdks/python/apache_beam/dataframe/frames_test.py   | 46 +++++++++++++++--
 .../apache_beam/dataframe/pandas_doctests_test.py  | 58 ++++++++++++++++++----
 sdks/python/setup.py                               |  2 +-
 4 files changed, 105 insertions(+), 22 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index b834d9c..45ae8c6 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -55,6 +55,9 @@ __all__ = [
     'DeferredDataFrame',
 ]
 
+# Get major, minor version
+PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2]))
+
 
 def populate_not_implemented(pd_type):
   def wrapper(deferred_type):
@@ -1932,7 +1935,7 @@ class DeferredSeries(DeferredDataFrameOrSeries):
     else:
       column = self
 
-    result = column.groupby(column).size()
+    result = column.groupby(column, dropna=dropna).size()
 
     # groupby.size() names the index, which we don't need
     result.index.name = None
@@ -2392,8 +2395,8 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
     if func in ('quantile',):
       return getattr(self, func)(*args, axis=axis, **kwargs)
 
-    # Maps to a property, args are ignored
-    if func in ('size',):
+    # In pandas<1.3.0, maps to a property, args are ignored
+    if func in ('size',) and PD_VERSION < (1, 3):
       return getattr(self, func)
 
     # We also have specialized distributed implementations for these. They only
@@ -3392,7 +3395,7 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
 
   @frame_base.with_docs_from(pd.DataFrame)
   def value_counts(self, subset=None, sort=False, normalize=False,
-                   ascending=False):
+                   ascending=False, dropna=True):
     """``sort`` is ``False`` by default, and ``sort=True`` is not supported
     because it imposes an ordering on the dataset which likely will not be
     preserved."""
@@ -3403,10 +3406,16 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
           "ordering on the dataset which likely will not be preserved.",
           reason="order-sensitive")
     columns = subset or list(self.columns)
-    result = self.groupby(columns).size()
+
+    if dropna:
+      dropped = self.dropna()
+    else:
+      dropped = self
+
+    result = dropped.groupby(columns, dropna=dropna).size()
 
     if normalize:
-      return result/self.dropna().length()
+      return result/dropped.length()
     else:
       return result
 
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index c3972ad..a2703d8 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -25,7 +25,8 @@ from apache_beam.dataframe import expressions
 from apache_beam.dataframe import frame_base
 from apache_beam.dataframe import frames
 
-PD_VERSION = tuple(map(int, pd.__version__.split('.')))
+# Get major, minor version
+PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2]))
 
 GROUPBY_DF = pd.DataFrame({
     'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],
@@ -235,6 +236,17 @@ class DeferredFrameTest(_AbstractFrameTest):
     self._run_test(
         lambda df, df2: df.subtract(2).multiply(df2).divide(df), df, df2)
 
+  @unittest.skipIf(PD_VERSION < (1, 3), "dropna=False is new in pandas 1.3")
+  def test_value_counts_dropna_false(self):
+    df = pd.DataFrame({
+        'first_name': ['John', 'Anne', 'John', 'Beth'],
+        'middle_name': ['Smith', pd.NA, pd.NA, 'Louise']
+    })
+    # TODO(BEAM-12495): Remove the assertRaises this when the underlying bug in
+    # https://github.com/pandas-dev/pandas/issues/36470 is fixed.
+    with self.assertRaises(NotImplementedError):
+      self._run_test(lambda df: df.value_counts(dropna=False), df)
+
   def test_get_column(self):
     df = pd.DataFrame({
         'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
@@ -369,10 +381,15 @@ class DeferredFrameTest(_AbstractFrameTest):
         nonparallel=True)
 
   def test_combine_Series(self):
-    with expressions.allow_non_parallel_operations():
-      s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0})
-      s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
-      self._run_test(lambda s1, s2: s1.combine(s2, max), s1, s2)
+    s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0})
+    s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+    self._run_test(
+        lambda s1,
+        s2: s1.combine(s2, max),
+        s1,
+        s2,
+        nonparallel=True,
+        check_proxy=False)
 
   def test_combine_first_dataframe(self):
     df1 = pd.DataFrame({'A': [None, 0], 'B': [None, 4]})
@@ -587,8 +604,27 @@ class DeferredFrameTest(_AbstractFrameTest):
     self._run_test(lambda df: df.value_counts(), df)
     self._run_test(lambda df: df.value_counts(normalize=True), df)
 
+    if PD_VERSION >= (1, 3):
+      # dropna=False is new in pandas 1.3
+      # TODO(BEAM-12495): Remove the assertRaises this when the underlying bug
+      # in https://github.com/pandas-dev/pandas/issues/36470 is fixed.
+      with self.assertRaises(NotImplementedError):
+        self._run_test(lambda df: df.value_counts(dropna=False), df)
+
+    # Test the defaults.
     self._run_test(lambda df: df.num_wings.value_counts(), df)
     self._run_test(lambda df: df.num_wings.value_counts(normalize=True), df)
+    self._run_test(lambda df: df.num_wings.value_counts(dropna=False), df)
+
+    # Test the combination interactions.
+    for normalize in (True, False):
+      for dropna in (True, False):
+        self._run_test(
+            lambda df,
+            dropna=dropna,
+            normalize=normalize: df.num_wings.value_counts(
+                dropna=dropna, normalize=normalize),
+            df)
 
   def test_value_counts_does_not_support_sort(self):
     df = pd.DataFrame({
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index edc42f1..755e4e5 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -20,6 +20,7 @@ import unittest
 import pandas as pd
 
 from apache_beam.dataframe import doctests
+from apache_beam.dataframe.frames import PD_VERSION
 from apache_beam.dataframe.pandas_top_level_functions import _is_top_level_function
 
 
@@ -68,7 +69,8 @@ class DoctestTest(unittest.TestCase):
                 "df.replace(regex={r'^ba.$': 'new', 'foo': 'xyz'})"
             ],
             'pandas.core.generic.NDFrame.fillna': [
-                "df.fillna(method='ffill')",
+                'df.fillna(method=\'ffill\')',
+                'df.fillna(method="ffill")',
                 'df.fillna(value=values, limit=1)',
             ],
             'pandas.core.generic.NDFrame.sort_values': ['*'],
@@ -164,7 +166,8 @@ class DoctestTest(unittest.TestCase):
             'pandas.core.frame.DataFrame.cumprod': ['*'],
             'pandas.core.frame.DataFrame.diff': ['*'],
             'pandas.core.frame.DataFrame.fillna': [
-                "df.fillna(method='ffill')",
+                'df.fillna(method=\'ffill\')',
+                'df.fillna(method="ffill")',
                 'df.fillna(value=values, limit=1)',
             ],
             'pandas.core.frame.DataFrame.items': ['*'],
@@ -237,6 +240,8 @@ class DoctestTest(unittest.TestCase):
                 # reindex not supported
                 's2 = s.reindex([1, 0, 2, 3])',
             ],
+            'pandas.core.frame.DataFrame.resample': ['*'],
+            'pandas.core.frame.DataFrame.values': ['*'],
         },
         not_implemented_ok={
             'pandas.core.frame.DataFrame.transform': [
@@ -244,6 +249,8 @@ class DoctestTest(unittest.TestCase):
                 # frames_test.py::DeferredFrameTest::test_groupby_transform_sum
                 "df.groupby('Date')['Data'].transform('sum')",
             ],
+            'pandas.core.frame.DataFrame.swaplevel': ['*'],
+            'pandas.core.frame.DataFrame.melt': ['*'],
             'pandas.core.frame.DataFrame.reindex_axis': ['*'],
             'pandas.core.frame.DataFrame.round': [
                 'df.round(decimals)',
@@ -267,6 +274,11 @@ class DoctestTest(unittest.TestCase):
             'pandas.core.frame.DataFrame.set_index': [
                 "df.set_index([s, s**2])",
             ],
+
+            # TODO(BEAM-12495)
+            'pandas.core.frame.DataFrame.value_counts': [
+              'df.value_counts(dropna=False)'
+            ],
         },
         skip={
             # s2 created with reindex
@@ -274,6 +286,8 @@ class DoctestTest(unittest.TestCase):
                 'df.dot(s2)',
             ],
 
+            'pandas.core.frame.DataFrame.resample': ['df'],
+            'pandas.core.frame.DataFrame.asfreq': ['*'],
             # Throws NotImplementedError when modifying df
             'pandas.core.frame.DataFrame.axes': [
                 # Returns deferred index.
@@ -302,6 +316,14 @@ class DoctestTest(unittest.TestCase):
             'pandas.core.frame.DataFrame.to_markdown': ['*'],
             'pandas.core.frame.DataFrame.to_parquet': ['*'],
 
+            # Raises right exception, but testing framework has matching issues.
+            # Tested in `frames_test.py`.
+            'pandas.core.frame.DataFrame.insert': [
+                'df',
+                'df.insert(1, "newcol", [99, 99])',
+                'df.insert(0, "col1", [100, 100], allow_duplicates=True)'
+            ],
+
             'pandas.core.frame.DataFrame.to_records': [
                 'df.index = df.index.rename("I")',
                 'index_dtypes = f"<S{df.index.str.len().max()}"', # 1.x
@@ -385,7 +407,8 @@ class DoctestTest(unittest.TestCase):
                 's.dot(arr)',  # non-deferred result
             ],
             'pandas.core.series.Series.fillna': [
-                "df.fillna(method='ffill')",
+                'df.fillna(method=\'ffill\')',
+                'df.fillna(method="ffill")',
                 'df.fillna(value=values, limit=1)',
             ],
             'pandas.core.series.Series.items': ['*'],
@@ -434,11 +457,11 @@ class DoctestTest(unittest.TestCase):
                 's.drop_duplicates()',
                 "s.drop_duplicates(keep='last')",
             ],
-            'pandas.core.series.Series.repeat': [
-                's.repeat([1, 2, 3])'
-            ],
             'pandas.core.series.Series.reindex': ['*'],
             'pandas.core.series.Series.autocorr': ['*'],
+            'pandas.core.series.Series.repeat': ['s.repeat([1, 2, 3])'],
+            'pandas.core.series.Series.resample': ['*'],
+            'pandas.core.series.Series': ['ser.iloc[0] = 999'],
         },
         not_implemented_ok={
             'pandas.core.series.Series.transform': [
@@ -451,8 +474,11 @@ class DoctestTest(unittest.TestCase):
                 'ser.groupby(["a", "b", "a", np.nan]).mean()',
                 'ser.groupby(["a", "b", "a", np.nan], dropna=False).mean()',
             ],
+            'pandas.core.series.Series.swaplevel' :['*']
         },
         skip={
+            # Relies on setting values with iloc
+            'pandas.core.series.Series': ['ser', 'r'],
             'pandas.core.series.Series.groupby': [
                 # TODO(BEAM-11393): This example requires aligning two series
                 # with non-unique indexes. It only works in pandas because
@@ -460,6 +486,7 @@ class DoctestTest(unittest.TestCase):
                 # alignment.
                 'ser.groupby(ser > 100).mean()',
             ],
+            'pandas.core.series.Series.asfreq': ['*'],
             # error formatting
             'pandas.core.series.Series.append': [
                 's1.append(s2, verify_integrity=True)',
@@ -491,12 +518,12 @@ class DoctestTest(unittest.TestCase):
                 # Inspection after modification.
                 's'
             ],
+            'pandas.core.series.Series.resample': ['df'],
         })
     self.assertEqual(result.failed, 0)
 
   def test_string_tests(self):
-    PD_VERSION = tuple(int(v) for v in pd.__version__.split('.'))
-    if PD_VERSION < (1, 2, 0):
+    if PD_VERSION < (1, 2):
       module = pd.core.strings
     else:
       # Definitions were moved to accessor in pandas 1.2.0
@@ -668,11 +695,13 @@ class DoctestTest(unittest.TestCase):
             'pandas.core.groupby.generic.SeriesGroupBy.diff': ['*'],
             'pandas.core.groupby.generic.DataFrameGroupBy.hist': ['*'],
             'pandas.core.groupby.generic.DataFrameGroupBy.fillna': [
-                "df.fillna(method='ffill')",
+                'df.fillna(method=\'ffill\')',
+                'df.fillna(method="ffill")',
                 'df.fillna(value=values, limit=1)',
             ],
             'pandas.core.groupby.generic.SeriesGroupBy.fillna': [
-                "df.fillna(method='ffill')",
+                'df.fillna(method=\'ffill\')',
+                'df.fillna(method="ffill")',
                 'df.fillna(value=values, limit=1)',
             ],
         },
@@ -682,6 +711,7 @@ class DoctestTest(unittest.TestCase):
             'pandas.core.groupby.generic.SeriesGroupBy.transform': ['*'],
             'pandas.core.groupby.generic.SeriesGroupBy.idxmax': ['*'],
             'pandas.core.groupby.generic.SeriesGroupBy.idxmin': ['*'],
+            'pandas.core.groupby.generic.SeriesGroupBy.apply': ['*'],
         },
         skip={
             'pandas.core.groupby.generic.SeriesGroupBy.cov': [
@@ -698,6 +728,14 @@ class DoctestTest(unittest.TestCase):
             # These examples rely on grouping by a list
             'pandas.core.groupby.generic.SeriesGroupBy.aggregate': ['*'],
             'pandas.core.groupby.generic.DataFrameGroupBy.aggregate': ['*'],
+            'pandas.core.groupby.generic.SeriesGroupBy.transform': [
+                # Dropping invalid columns during a transform is unsupported.
+                'grouped.transform(lambda x: (x - x.mean()) / x.std())'
+            ],
+            'pandas.core.groupby.generic.DataFrameGroupBy.transform': [
+                # Dropping invalid columns during a transform is unsupported.
+                'grouped.transform(lambda x: (x - x.mean()) / x.std())'
+            ],
         })
     self.assertEqual(result.failed, 0)
 
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index f4e02b8..338251d 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -166,7 +166,7 @@ if sys.platform == 'win32' and sys.maxsize <= 2**32:
 REQUIRED_TEST_PACKAGES = [
     'freezegun>=0.3.12',
     'mock>=1.0.1,<3.0.0',
-    'pandas>=1.0,<1.3.0',
+    'pandas>=1.0,<1.4.0',
     'parameterized>=0.7.1,<0.8.0',
     'pyhamcrest>=1.9,!=1.10.0,<2.0.0',
     'pyyaml>=3.12,<6.0.0',