You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2021/08/26 03:43:09 UTC
[beam] 01/01: Revert "[BEAM-12764] Revert "Merge pull request #15165 from [BEAM-12593] Veri…"
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch revert-15385-rollback-dataframes
in repository https://gitbox.apache.org/repos/asf/beam.git
commit 0766b5c05cab37f3b77af625507a7ffb4835ce53
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Wed Aug 25 20:42:24 2021 -0700
Revert "[BEAM-12764] Revert "Merge pull request #15165 from [BEAM-12593] Veri…"
---
sdks/python/apache_beam/dataframe/frames.py | 21 +++++---
sdks/python/apache_beam/dataframe/frames_test.py | 46 +++++++++++++++--
.../apache_beam/dataframe/pandas_doctests_test.py | 58 ++++++++++++++++++----
sdks/python/setup.py | 2 +-
4 files changed, 105 insertions(+), 22 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index b834d9c..45ae8c6 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -55,6 +55,9 @@ __all__ = [
'DeferredDataFrame',
]
+# Get major, minor version
+PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2]))
+
def populate_not_implemented(pd_type):
def wrapper(deferred_type):
@@ -1932,7 +1935,7 @@ class DeferredSeries(DeferredDataFrameOrSeries):
else:
column = self
- result = column.groupby(column).size()
+ result = column.groupby(column, dropna=dropna).size()
# groupby.size() names the index, which we don't need
result.index.name = None
@@ -2392,8 +2395,8 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
if func in ('quantile',):
return getattr(self, func)(*args, axis=axis, **kwargs)
- # Maps to a property, args are ignored
- if func in ('size',):
+ # In pandas<1.3.0, maps to a property, args are ignored
+ if func in ('size',) and PD_VERSION < (1, 3):
return getattr(self, func)
# We also have specialized distributed implementations for these. They only
@@ -3392,7 +3395,7 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
@frame_base.with_docs_from(pd.DataFrame)
def value_counts(self, subset=None, sort=False, normalize=False,
- ascending=False):
+ ascending=False, dropna=True):
"""``sort`` is ``False`` by default, and ``sort=True`` is not supported
because it imposes an ordering on the dataset which likely will not be
preserved."""
@@ -3403,10 +3406,16 @@ class DeferredDataFrame(DeferredDataFrameOrSeries):
"ordering on the dataset which likely will not be preserved.",
reason="order-sensitive")
columns = subset or list(self.columns)
- result = self.groupby(columns).size()
+
+ if dropna:
+ dropped = self.dropna()
+ else:
+ dropped = self
+
+ result = dropped.groupby(columns, dropna=dropna).size()
if normalize:
- return result/self.dropna().length()
+ return result/dropped.length()
else:
return result
diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py
index c3972ad..a2703d8 100644
--- a/sdks/python/apache_beam/dataframe/frames_test.py
+++ b/sdks/python/apache_beam/dataframe/frames_test.py
@@ -25,7 +25,8 @@ from apache_beam.dataframe import expressions
from apache_beam.dataframe import frame_base
from apache_beam.dataframe import frames
-PD_VERSION = tuple(map(int, pd.__version__.split('.')))
+# Get major, minor version
+PD_VERSION = tuple(map(int, pd.__version__.split('.')[0:2]))
GROUPBY_DF = pd.DataFrame({
'group': ['a' if i % 5 == 0 or i % 3 == 0 else 'b' for i in range(100)],
@@ -235,6 +236,17 @@ class DeferredFrameTest(_AbstractFrameTest):
self._run_test(
lambda df, df2: df.subtract(2).multiply(df2).divide(df), df, df2)
+ @unittest.skipIf(PD_VERSION < (1, 3), "dropna=False is new in pandas 1.3")
+ def test_value_counts_dropna_false(self):
+ df = pd.DataFrame({
+ 'first_name': ['John', 'Anne', 'John', 'Beth'],
+ 'middle_name': ['Smith', pd.NA, pd.NA, 'Louise']
+ })
+ # TODO(BEAM-12495): Remove the assertRaises this when the underlying bug in
+ # https://github.com/pandas-dev/pandas/issues/36470 is fixed.
+ with self.assertRaises(NotImplementedError):
+ self._run_test(lambda df: df.value_counts(dropna=False), df)
+
def test_get_column(self):
df = pd.DataFrame({
'Animal': ['Falcon', 'Falcon', 'Parrot', 'Parrot'],
@@ -369,10 +381,15 @@ class DeferredFrameTest(_AbstractFrameTest):
nonparallel=True)
def test_combine_Series(self):
- with expressions.allow_non_parallel_operations():
- s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0})
- s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
- self._run_test(lambda s1, s2: s1.combine(s2, max), s1, s2)
+ s1 = pd.Series({'falcon': 330.0, 'eagle': 160.0})
+ s2 = pd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0})
+ self._run_test(
+ lambda s1,
+ s2: s1.combine(s2, max),
+ s1,
+ s2,
+ nonparallel=True,
+ check_proxy=False)
def test_combine_first_dataframe(self):
df1 = pd.DataFrame({'A': [None, 0], 'B': [None, 4]})
@@ -587,8 +604,27 @@ class DeferredFrameTest(_AbstractFrameTest):
self._run_test(lambda df: df.value_counts(), df)
self._run_test(lambda df: df.value_counts(normalize=True), df)
+ if PD_VERSION >= (1, 3):
+ # dropna=False is new in pandas 1.3
+ # TODO(BEAM-12495): Remove the assertRaises this when the underlying bug
+ # in https://github.com/pandas-dev/pandas/issues/36470 is fixed.
+ with self.assertRaises(NotImplementedError):
+ self._run_test(lambda df: df.value_counts(dropna=False), df)
+
+ # Test the defaults.
self._run_test(lambda df: df.num_wings.value_counts(), df)
self._run_test(lambda df: df.num_wings.value_counts(normalize=True), df)
+ self._run_test(lambda df: df.num_wings.value_counts(dropna=False), df)
+
+ # Test the combination interactions.
+ for normalize in (True, False):
+ for dropna in (True, False):
+ self._run_test(
+ lambda df,
+ dropna=dropna,
+ normalize=normalize: df.num_wings.value_counts(
+ dropna=dropna, normalize=normalize),
+ df)
def test_value_counts_does_not_support_sort(self):
df = pd.DataFrame({
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index edc42f1..755e4e5 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -20,6 +20,7 @@ import unittest
import pandas as pd
from apache_beam.dataframe import doctests
+from apache_beam.dataframe.frames import PD_VERSION
from apache_beam.dataframe.pandas_top_level_functions import _is_top_level_function
@@ -68,7 +69,8 @@ class DoctestTest(unittest.TestCase):
"df.replace(regex={r'^ba.$': 'new', 'foo': 'xyz'})"
],
'pandas.core.generic.NDFrame.fillna': [
- "df.fillna(method='ffill')",
+ 'df.fillna(method=\'ffill\')',
+ 'df.fillna(method="ffill")',
'df.fillna(value=values, limit=1)',
],
'pandas.core.generic.NDFrame.sort_values': ['*'],
@@ -164,7 +166,8 @@ class DoctestTest(unittest.TestCase):
'pandas.core.frame.DataFrame.cumprod': ['*'],
'pandas.core.frame.DataFrame.diff': ['*'],
'pandas.core.frame.DataFrame.fillna': [
- "df.fillna(method='ffill')",
+ 'df.fillna(method=\'ffill\')',
+ 'df.fillna(method="ffill")',
'df.fillna(value=values, limit=1)',
],
'pandas.core.frame.DataFrame.items': ['*'],
@@ -237,6 +240,8 @@ class DoctestTest(unittest.TestCase):
# reindex not supported
's2 = s.reindex([1, 0, 2, 3])',
],
+ 'pandas.core.frame.DataFrame.resample': ['*'],
+ 'pandas.core.frame.DataFrame.values': ['*'],
},
not_implemented_ok={
'pandas.core.frame.DataFrame.transform': [
@@ -244,6 +249,8 @@ class DoctestTest(unittest.TestCase):
# frames_test.py::DeferredFrameTest::test_groupby_transform_sum
"df.groupby('Date')['Data'].transform('sum')",
],
+ 'pandas.core.frame.DataFrame.swaplevel': ['*'],
+ 'pandas.core.frame.DataFrame.melt': ['*'],
'pandas.core.frame.DataFrame.reindex_axis': ['*'],
'pandas.core.frame.DataFrame.round': [
'df.round(decimals)',
@@ -267,6 +274,11 @@ class DoctestTest(unittest.TestCase):
'pandas.core.frame.DataFrame.set_index': [
"df.set_index([s, s**2])",
],
+
+ # TODO(BEAM-12495)
+ 'pandas.core.frame.DataFrame.value_counts': [
+ 'df.value_counts(dropna=False)'
+ ],
},
skip={
# s2 created with reindex
@@ -274,6 +286,8 @@ class DoctestTest(unittest.TestCase):
'df.dot(s2)',
],
+ 'pandas.core.frame.DataFrame.resample': ['df'],
+ 'pandas.core.frame.DataFrame.asfreq': ['*'],
# Throws NotImplementedError when modifying df
'pandas.core.frame.DataFrame.axes': [
# Returns deferred index.
@@ -302,6 +316,14 @@ class DoctestTest(unittest.TestCase):
'pandas.core.frame.DataFrame.to_markdown': ['*'],
'pandas.core.frame.DataFrame.to_parquet': ['*'],
+ # Raises right exception, but testing framework has matching issues.
+ # Tested in `frames_test.py`.
+ 'pandas.core.frame.DataFrame.insert': [
+ 'df',
+ 'df.insert(1, "newcol", [99, 99])',
+ 'df.insert(0, "col1", [100, 100], allow_duplicates=True)'
+ ],
+
'pandas.core.frame.DataFrame.to_records': [
'df.index = df.index.rename("I")',
'index_dtypes = f"<S{df.index.str.len().max()}"', # 1.x
@@ -385,7 +407,8 @@ class DoctestTest(unittest.TestCase):
's.dot(arr)', # non-deferred result
],
'pandas.core.series.Series.fillna': [
- "df.fillna(method='ffill')",
+ 'df.fillna(method=\'ffill\')',
+ 'df.fillna(method="ffill")',
'df.fillna(value=values, limit=1)',
],
'pandas.core.series.Series.items': ['*'],
@@ -434,11 +457,11 @@ class DoctestTest(unittest.TestCase):
's.drop_duplicates()',
"s.drop_duplicates(keep='last')",
],
- 'pandas.core.series.Series.repeat': [
- 's.repeat([1, 2, 3])'
- ],
'pandas.core.series.Series.reindex': ['*'],
'pandas.core.series.Series.autocorr': ['*'],
+ 'pandas.core.series.Series.repeat': ['s.repeat([1, 2, 3])'],
+ 'pandas.core.series.Series.resample': ['*'],
+ 'pandas.core.series.Series': ['ser.iloc[0] = 999'],
},
not_implemented_ok={
'pandas.core.series.Series.transform': [
@@ -451,8 +474,11 @@ class DoctestTest(unittest.TestCase):
'ser.groupby(["a", "b", "a", np.nan]).mean()',
'ser.groupby(["a", "b", "a", np.nan], dropna=False).mean()',
],
+ 'pandas.core.series.Series.swaplevel' :['*']
},
skip={
+ # Relies on setting values with iloc
+ 'pandas.core.series.Series': ['ser', 'r'],
'pandas.core.series.Series.groupby': [
# TODO(BEAM-11393): This example requires aligning two series
# with non-unique indexes. It only works in pandas because
@@ -460,6 +486,7 @@ class DoctestTest(unittest.TestCase):
# alignment.
'ser.groupby(ser > 100).mean()',
],
+ 'pandas.core.series.Series.asfreq': ['*'],
# error formatting
'pandas.core.series.Series.append': [
's1.append(s2, verify_integrity=True)',
@@ -491,12 +518,12 @@ class DoctestTest(unittest.TestCase):
# Inspection after modification.
's'
],
+ 'pandas.core.series.Series.resample': ['df'],
})
self.assertEqual(result.failed, 0)
def test_string_tests(self):
- PD_VERSION = tuple(int(v) for v in pd.__version__.split('.'))
- if PD_VERSION < (1, 2, 0):
+ if PD_VERSION < (1, 2):
module = pd.core.strings
else:
# Definitions were moved to accessor in pandas 1.2.0
@@ -668,11 +695,13 @@ class DoctestTest(unittest.TestCase):
'pandas.core.groupby.generic.SeriesGroupBy.diff': ['*'],
'pandas.core.groupby.generic.DataFrameGroupBy.hist': ['*'],
'pandas.core.groupby.generic.DataFrameGroupBy.fillna': [
- "df.fillna(method='ffill')",
+ 'df.fillna(method=\'ffill\')',
+ 'df.fillna(method="ffill")',
'df.fillna(value=values, limit=1)',
],
'pandas.core.groupby.generic.SeriesGroupBy.fillna': [
- "df.fillna(method='ffill')",
+ 'df.fillna(method=\'ffill\')',
+ 'df.fillna(method="ffill")',
'df.fillna(value=values, limit=1)',
],
},
@@ -682,6 +711,7 @@ class DoctestTest(unittest.TestCase):
'pandas.core.groupby.generic.SeriesGroupBy.transform': ['*'],
'pandas.core.groupby.generic.SeriesGroupBy.idxmax': ['*'],
'pandas.core.groupby.generic.SeriesGroupBy.idxmin': ['*'],
+ 'pandas.core.groupby.generic.SeriesGroupBy.apply': ['*'],
},
skip={
'pandas.core.groupby.generic.SeriesGroupBy.cov': [
@@ -698,6 +728,14 @@ class DoctestTest(unittest.TestCase):
# These examples rely on grouping by a list
'pandas.core.groupby.generic.SeriesGroupBy.aggregate': ['*'],
'pandas.core.groupby.generic.DataFrameGroupBy.aggregate': ['*'],
+ 'pandas.core.groupby.generic.SeriesGroupBy.transform': [
+ # Dropping invalid columns during a transform is unsupported.
+ 'grouped.transform(lambda x: (x - x.mean()) / x.std())'
+ ],
+ 'pandas.core.groupby.generic.DataFrameGroupBy.transform': [
+ # Dropping invalid columns during a transform is unsupported.
+ 'grouped.transform(lambda x: (x - x.mean()) / x.std())'
+ ],
})
self.assertEqual(result.failed, 0)
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index f4e02b8..338251d 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -166,7 +166,7 @@ if sys.platform == 'win32' and sys.maxsize <= 2**32:
REQUIRED_TEST_PACKAGES = [
'freezegun>=0.3.12',
'mock>=1.0.1,<3.0.0',
- 'pandas>=1.0,<1.3.0',
+ 'pandas>=1.0,<1.4.0',
'parameterized>=0.7.1,<0.8.0',
'pyhamcrest>=1.9,!=1.10.0,<2.0.0',
'pyyaml>=3.12,<6.0.0',