You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/09/23 22:24:57 UTC
[beam] branch master updated: [BEAM-9547] Roll forward #12858
(#12920)
This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3d67200 [BEAM-9547] Roll forward #12858 (#12920)
3d67200 is described below
commit 3d67200af0d7faf968682ac85b196699195905e1
Author: Brian Hulette <bh...@google.com>
AuthorDate: Wed Sep 23 15:24:32 2020 -0700
[BEAM-9547] Roll forward #12858 (#12920)
* [BEAM-9547] Raise NotImplementedError and WontImplementError throughout DeferredDataframe (#12858)
* Draft of NotImplementedErrors for DataFrame
* Default to jira BEAM-9547
* pivot is NotImplementedError
* remove mistaken change to frames
* Don't use __ror__ to convert df to PCollection, it's already a DataFrame operator
---
sdks/python/apache_beam/dataframe/frame_base.py | 7 +
sdks/python/apache_beam/dataframe/frames.py | 118 +++++++++++++----
sdks/python/apache_beam/dataframe/io.py | 21 ++-
.../apache_beam/dataframe/pandas_doctests_test.py | 145 ++++++++++++++-------
4 files changed, 207 insertions(+), 84 deletions(-)
diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py
index 4a37f74..2d1f2a0 100644
--- a/sdks/python/apache_beam/dataframe/frame_base.py
+++ b/sdks/python/apache_beam/dataframe/frame_base.py
@@ -251,6 +251,13 @@ def wont_implement_method(msg):
return wrapper
+def not_implemented_method(op, jira='BEAM-9547'):
+ def wrapper(self, *args, **kwargs):
+ raise NotImplementedError("'%s' is not yet supported (%s)" % (op, jira))
+
+ return wrapper
+
+
def copy_and_mutate(func):
def wrapper(self, *args, **kwargs):
copy = self.copy()
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 5b1dd6d..88b4b79 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -70,6 +70,8 @@ class DeferredSeries(frame_base.DeferredFrame):
preserves_partition_by=partitionings.Singleton(),
requires_partition_by=partitionings.Nothing()))
+ reindex = frame_base.not_implemented_method('reindex')
+
to_numpy = to_string = frame_base.wont_implement_method('non-deferred value')
transform = frame_base._elementwise_method(
@@ -117,6 +119,11 @@ class DeferredSeries(frame_base.DeferredFrame):
head = tail = frame_base.wont_implement_method('order-sensitive')
+ memory_usage = frame_base.wont_implement_method('non-deferred value')
+
+ # In Series __contains__ checks the index
+ __contains__ = frame_base.wont_implement_method('non-deferred value')
+
@frame_base.args_to_kwargs(pd.Series)
@frame_base.populate_defaults(pd.Series)
def nlargest(self, keep, **kwargs):
@@ -226,29 +233,6 @@ class DeferredSeries(frame_base.DeferredFrame):
return _DeferredStringMethods(expr)
-for base in ['add',
- 'sub',
- 'mul',
- 'div',
- 'truediv',
- 'floordiv',
- 'mod',
- 'pow',
- 'and',
- 'or']:
- for p in ['%s', 'r%s', '__%s__', '__r%s__']:
- # TODO: non-trivial level?
- name = p % base
- setattr(
- DeferredSeries,
- name,
- frame_base._elementwise_method(name, restrictions={'level': None}))
- setattr(
- DeferredSeries,
- '__i%s__' % base,
- frame_base._elementwise_method('__i%s__' % base, inplace=True))
-for name in ['__lt__', '__le__', '__gt__', '__ge__', '__eq__', '__ne__']:
- setattr(DeferredSeries, name, frame_base._elementwise_method(name))
for name in ['apply', 'map', 'transform']:
setattr(DeferredSeries, name, frame_base._elementwise_method(name))
@@ -259,6 +243,10 @@ class DeferredDataFrame(frame_base.DeferredFrame):
def T(self):
return self.transpose()
+ @property
+ def columns(self):
+ return self._expr.proxy().columns
+
def groupby(self, by):
# TODO: what happens to the existing index?
# We set the columns to index as we have a notion of being partitioned by
@@ -280,6 +268,13 @@ class DeferredDataFrame(frame_base.DeferredFrame):
def __getitem__(self, key):
# TODO: Replicate pd.DataFrame.__getitem__ logic
+ if isinstance(key, frame_base.DeferredBase):
+ # Fail early if key is a DeferredBase as it interacts surprisingly with
+ # key in self._expr.proxy().columns
+ raise NotImplementedError(
+ "Indexing with a deferred frame is not yet supported. Consider "
+ "using df.loc[...]")
+
if (isinstance(key, list) and
all(key_column in self._expr.proxy().columns
for key_column in key)) or key in self._expr.proxy().columns:
@@ -287,6 +282,10 @@ class DeferredDataFrame(frame_base.DeferredFrame):
else:
raise NotImplementedError(key)
+ def __contains__(self, key):
+ # Checks if proxy has the given column
+ return self._expr.proxy().__contains__(key)
+
def __setitem__(self, key, value):
if isinstance(key, str):
# yapf: disable
@@ -314,13 +313,37 @@ class DeferredDataFrame(frame_base.DeferredFrame):
requires_partition_by=partitionings.Nothing(),
preserves_partition_by=partitionings.Nothing()))
- def at(self, *args, **kwargs):
- raise NotImplementedError()
+ at = frame_base.not_implemented_method('at')
@property
def loc(self):
return _DeferredLoc(self)
+ _get_index = _set_index = frame_base.not_implemented_method('index')
+ index = property(_get_index, _set_index)
+
+ @property
+ def axes(self):
+ return (self.index, self.columns)
+
+ apply = frame_base.not_implemented_method('apply')
+ explode = frame_base.not_implemented_method('explode')
+ isin = frame_base.not_implemented_method('isin')
+ assign = frame_base.not_implemented_method('assign')
+ append = frame_base.not_implemented_method('append')
+ combine = frame_base.not_implemented_method('combine')
+ combine_first = frame_base.not_implemented_method('combine_first')
+ cov = frame_base.not_implemented_method('cov')
+ corr = frame_base.not_implemented_method('corr')
+ count = frame_base.not_implemented_method('count')
+ dot = frame_base.not_implemented_method('dot')
+ drop = frame_base.not_implemented_method('drop')
+ eval = frame_base.not_implemented_method('eval')
+ reindex = frame_base.not_implemented_method('reindex')
+ melt = frame_base.not_implemented_method('melt')
+ pivot = frame_base.not_implemented_method('pivot')
+ pivot_table = frame_base.not_implemented_method('pivot_table')
+
def aggregate(self, func, axis=0, *args, **kwargs):
if axis is None:
# Aggregate across all elements by first aggregating across columns,
@@ -383,6 +406,7 @@ class DeferredDataFrame(frame_base.DeferredFrame):
applymap = frame_base._elementwise_method('applymap')
memory_usage = frame_base.wont_implement_method('non-deferred value')
+ info = frame_base.wont_implement_method('non-deferred value')
all = frame_base._agg_method('all')
any = frame_base._agg_method('any')
@@ -398,6 +422,8 @@ class DeferredDataFrame(frame_base.DeferredFrame):
def mode(self, axis=0, *args, **kwargs):
if axis == 1 or axis == 'columns':
+ # Number of columns is max(number mode values for each row), so we can't
+ # determine how many there will be before looking at the data.
raise frame_base.WontImplementError('non-deferred column values')
return frame_base.DeferredFrame.wrap(
expressions.ComputedExpression(
@@ -766,8 +792,7 @@ class DeferredDataFrame(frame_base.DeferredFrame):
transform = frame_base._elementwise_method(
'transform', restrictions={'axis': 0})
- def transpose(self, *args, **kwargs):
- raise frame_base.WontImplementError('non-deferred column values')
+ transpose = frame_base.wont_implement_method('non-deferred column values')
def unstack(self, *args, **kwargs):
if self._expr.proxy().index.nlevels == 1:
@@ -799,7 +824,10 @@ for meth in ('filter', ):
class DeferredGroupBy(frame_base.DeferredFrame):
def agg(self, fn):
if not callable(fn):
- raise NotImplementedError(fn)
+ # TODO: Add support for strings in (UN)LIFTABLE_AGGREGATIONS. Test by
+ # running doctests for pandas.core.groupby.generic
+ raise NotImplementedError('GroupBy.agg currently only supports callable '
+ 'arguments')
return DeferredDataFrame(
expressions.ComputedExpression(
'agg',
@@ -963,3 +991,37 @@ for method in ELEMENTWISE_STRING_METHODS:
setattr(_DeferredStringMethods,
method,
frame_base._elementwise_method(method))
+
+for base in ['add',
+ 'sub',
+ 'mul',
+ 'div',
+ 'truediv',
+ 'floordiv',
+ 'mod',
+ 'pow',
+ 'and',
+ 'or']:
+ for p in ['%s', 'r%s', '__%s__', '__r%s__']:
+ # TODO: non-trivial level?
+ name = p % base
+ setattr(
+ DeferredSeries,
+ name,
+ frame_base._elementwise_method(name, restrictions={'level': None}))
+ setattr(
+ DeferredDataFrame,
+ name,
+ frame_base._elementwise_method(name, restrictions={'level': None}))
+ setattr(
+ DeferredSeries,
+ '__i%s__' % base,
+ frame_base._elementwise_method('__i%s__' % base, inplace=True))
+ setattr(
+ DeferredDataFrame,
+ '__i%s__' % base,
+ frame_base._elementwise_method('__i%s__' % base, inplace=True))
+
+for name in ['__lt__', '__le__', '__gt__', '__ge__', '__eq__', '__ne__']:
+ setattr(DeferredSeries, name, frame_base._elementwise_method(name))
+ setattr(DeferredDataFrame, name, frame_base._elementwise_method(name))
diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py
index 55c549a..2fc944d 100644
--- a/sdks/python/apache_beam/dataframe/io.py
+++ b/sdks/python/apache_beam/dataframe/io.py
@@ -40,8 +40,14 @@ def read_csv(path, *args, **kwargs):
return _ReadFromPandas(pd.read_csv, path, args, kwargs, incremental=True)
+def _as_pc(df):
+ from apache_beam.dataframe import convert # avoid circular import
+ # TODO(roberwb): Amortize the computation for multiple writes?
+ return convert.to_pcollection(df, yield_elements='pandas')
+
+
def to_csv(df, path, *args, **kwargs):
- return df | _WriteToPandas(
+ return _as_pc(df) | _WriteToPandas(
pd.DataFrame.to_csv, path, args, kwargs, incremental=True, binary=False)
@@ -68,7 +74,7 @@ def to_json(df, path, orient=None, *args, **kwargs):
else:
raise frame_base.WontImplementError('not dataframes or series')
kwargs['orient'] = orient
- return df | _WriteToPandas(
+ return _as_pc(df) | _WriteToPandas(
pd.DataFrame.to_json,
path,
args,
@@ -87,7 +93,7 @@ def read_html(path, *args, **kwargs):
def to_html(df, path, *args, **kwargs):
- return df | _WriteToPandas(
+ return _as_pc(df) | _WriteToPandas(
pd.DataFrame.to_html,
path,
args,
@@ -109,7 +115,7 @@ def _binary_writer(format):
lambda df,
path,
*args,
- **kwargs: df | _WriteToPandas(func, path, args, kwargs))
+ **kwargs: _as_pc(df) | _WriteToPandas(func, path, args, kwargs))
for format in ('excel', 'feather', 'parquet', 'stata'):
@@ -214,13 +220,6 @@ class _WriteToPandas(beam.PTransform):
self.incremental = incremental
self.binary = binary
- def __ror__(self, other, label=None):
- if isinstance(other, frame_base.DeferredBase):
- from apache_beam.dataframe import convert # avoid circular import
- # TODO(roberwb): Amortize the computation for multiple writes?
- other = convert.to_pcollection(other, yield_elements='pandas')
- return super(_WriteToPandas, self).__ror__(other, label)
-
def expand(self, pcoll):
dir, name = io.filesystems.FileSystems.split(self.path)
return pcoll | fileio.WriteToFiles(
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index 432a016..abc2b7d 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -68,32 +68,65 @@ class DoctestTest(unittest.TestCase):
],
'pandas.core.frame.DataFrame.unstack': ['*'],
'pandas.core.frame.DataFrame.memory_usage': ['*'],
- },
- skip={
- 'pandas.core.frame.DataFrame.T': [
- 'df1_transposed.dtypes', 'df2_transposed.dtypes'
+ 'pandas.core.frame.DataFrame.info': ['*'],
+ # Not equal to df.agg('mode', axis='columns', numeric_only=True)
+ # because there can be multiple columns if a row has more than one
+ # mode
+ 'pandas.core.frame.DataFrame.mode': [
+ "df.mode(axis='columns', numeric_only=True)"
],
- 'pandas.core.frame.DataFrame.agg': ['*'],
- 'pandas.core.frame.DataFrame.aggregate': ['*'],
- 'pandas.core.frame.DataFrame.append': ['*'],
- 'pandas.core.frame.DataFrame.apply': ['*'],
- 'pandas.core.frame.DataFrame.applymap': ['df ** 2'],
- 'pandas.core.frame.DataFrame.assign': ['*'],
+ },
+ not_implemented_ok={
+ 'pandas.core.frame.DataFrame.isin': ['*'],
+ 'pandas.core.frame.DataFrame.melt': ['*'],
'pandas.core.frame.DataFrame.axes': ['*'],
+ 'pandas.core.frame.DataFrame.count': ['*'],
+ 'pandas.core.frame.DataFrame.reindex': ['*'],
+ 'pandas.core.frame.DataFrame.reindex_axis': ['*'],
+
+ # We should be able to support pivot and pivot_table for categorical
+ # columns
+ 'pandas.core.frame.DataFrame.pivot': ['*'],
+
+ # DataFrame.__getitem__ cannot be used as loc
+ 'pandas.core.frame.DataFrame.query': [
+ 'df[df.A > df.B]', "df[df.B == df['C C']]"
+ ],
+
+ # We can implement this as a zipping operator, but it won't have the
+ # same capability. The doctest includes an example that branches on
+ # a deferred result.
'pandas.core.frame.DataFrame.combine': ['*'],
+
+ # Can be implemented as a zipping operator
'pandas.core.frame.DataFrame.combine_first': ['*'],
- 'pandas.core.frame.DataFrame.compare': ['*'],
+
+ # Difficult to parallelize but should be possible?
'pandas.core.frame.DataFrame.corr': ['*'],
- 'pandas.core.frame.DataFrame.count': ['*'],
'pandas.core.frame.DataFrame.cov': ['*'],
'pandas.core.frame.DataFrame.dot': ['*'],
+
+ # element-wise
+ 'pandas.core.frame.DataFrame.eval': ['*'],
+ 'pandas.core.frame.DataFrame.explode': ['*'],
+
+ # Trivially elementwise for axis=columns. Relies on global indexing
+ # for axis=rows.
'pandas.core.frame.DataFrame.drop': ['*'],
+ 'pandas.core.frame.DataFrame.rename': ['*'],
+ 'pandas.core.frame.DataFrame.apply': ['*'],
+
+ # Zipping operation if input is a DeferredSeries
+ 'pandas.core.frame.DataFrame.assign': ['*'],
+
+ # In theory this is possible for bounded inputs?
+ 'pandas.core.frame.DataFrame.append': ['*'],
+ },
+ skip={
+ 'pandas.core.frame.DataFrame.compare': ['*'],
'pandas.core.frame.DataFrame.drop_duplicates': ['*'],
'pandas.core.frame.DataFrame.duplicated': ['*'],
- 'pandas.core.frame.DataFrame.eval': ['*'],
- 'pandas.core.frame.DataFrame.explode': ['*'],
'pandas.core.frame.DataFrame.groupby': [
- # More keyword arguments.
'df.groupby(level=0).mean()',
'df.groupby(level="Type").mean()',
'df.groupby(by=["b"], dropna=False).sum()',
@@ -101,47 +134,67 @@ class DoctestTest(unittest.TestCase):
],
'pandas.core.frame.DataFrame.idxmax': ['*'],
'pandas.core.frame.DataFrame.idxmin': ['*'],
- 'pandas.core.frame.DataFrame.info': ['*'],
- 'pandas.core.frame.DataFrame.isin': ['*'],
- 'pandas.core.frame.DataFrame.iterrows': ["print(df['int'].dtype)"],
- 'pandas.core.frame.DataFrame.melt': ['*'],
- 'pandas.core.frame.DataFrame.memory_usage': ['*'],
+ 'pandas.core.frame.DataFrame.pop': ['*'],
+ 'pandas.core.frame.DataFrame.set_axis': ['*'],
+ 'pandas.core.frame.DataFrame.sort_index': ['*'],
+ 'pandas.core.frame.DataFrame.to_markdown': ['*'],
+ 'pandas.core.frame.DataFrame.to_parquet': ['*'],
+ 'pandas.core.frame.DataFrame.value_counts': ['*'],
+
+ 'pandas.core.frame.DataFrame.to_records': [
+ 'df.index = df.index.rename("I")',
+ 'index_dtypes = f"<S{df.index.str.len().max()}"', # 1.x
+ 'index_dtypes = "<S{}".format(df.index.str.len().max())', #0.x
+ 'df.to_records(index_dtypes=index_dtypes)',
+ ],
+ # These tests use the static method pd.pivot_table, which doesn't
+ # actually raise NotImplementedError
+ 'pandas.core.frame.DataFrame.pivot_table': ['*'],
+ # Expected to raise a ValueError, but we raise NotImplementedError
+ 'pandas.core.frame.DataFrame.pivot': [
+ "df.pivot(index='foo', columns='bar', values='baz')"
+ ],
+ 'pandas.core.frame.DataFrame.append': [
+ 'df',
+ # pylint: disable=line-too-long
+ "pd.concat([pd.DataFrame([i], columns=['A']) for i in range(5)],\n"
+ " ignore_index=True)"
+ ],
+ 'pandas.core.frame.DataFrame.eval': ['df'],
+ # No override for __matmul__ and friends
+ 'pandas.core.frame.DataFrame.dot': ['df @ other'],
+ 'pandas.core.frame.DataFrame.melt': [
+ "df.columns = [list('ABC'), list('DEF')]", "df"
+ ],
'pandas.core.frame.DataFrame.merge': [
# Order-sensitive index, checked in frames_test.py.
"df1.merge(df2, left_on='lkey', right_on='rkey')",
"df1.merge(df2, left_on='lkey', right_on='rkey',\n"
" suffixes=('_left', '_right'))",
],
- # Not equal to df.agg('mode', axis='columns', numeric_only=True)
- 'pandas.core.frame.DataFrame.mode': [
- "df.mode(axis='columns', numeric_only=True)"
- ],
- 'pandas.core.frame.DataFrame.pivot': ['*'],
- 'pandas.core.frame.DataFrame.pivot_table': ['*'],
- 'pandas.core.frame.DataFrame.pop': ['*'],
- 'pandas.core.frame.DataFrame.query': ['*'],
- 'pandas.core.frame.DataFrame.reindex': ['*'],
- # Sets df.index
- 'pandas.core.frame.DataFrame.reindex_axis': ['*'],
- 'pandas.core.frame.DataFrame.rename': ['*'],
# Raises right exception, but testing framework has matching issues.
'pandas.core.frame.DataFrame.replace': [
"df.replace({'a string': 'new value', True: False}) # raises"
],
- # Uses unseeded np.random.
- 'pandas.core.frame.DataFrame.round': ['*'],
- 'pandas.core.frame.DataFrame.set_axis': ['*'],
- 'pandas.core.frame.DataFrame.set_index': ['*'],
- 'pandas.core.frame.DataFrame.sort_index': ['*'],
+ # Should raise WontImplement order-sensitive
+ 'pandas.core.frame.DataFrame.set_index': [
+ "df.set_index([pd.Index([1, 2, 3, 4]), 'year'])",
+ "df.set_index([s, s**2])",
+ ],
+ 'pandas.core.frame.DataFrame.to_sparse': ['type(df)'],
+
+ # DeferredSeries has no attribute dtype. Should we allow this and
+ # defer to proxy?
+ 'pandas.core.frame.DataFrame.iterrows': ["print(df['int'].dtype)"],
+
+ # Skipped because "seen_wont_implement" is reset before getting to
+ # these calls, so the NameError they raise is not ignored.
+ 'pandas.core.frame.DataFrame.T': [
+ 'df1_transposed.dtypes', 'df2_transposed.dtypes'
+ ],
'pandas.core.frame.DataFrame.transpose': [
'df1_transposed.dtypes', 'df2_transposed.dtypes'
],
- 'pandas.core.frame.DataFrame.to_markdown': ['*'],
- 'pandas.core.frame.DataFrame.to_parquet': ['*'],
- # Uses df.index
- 'pandas.core.frame.DataFrame.to_records': ['*'],
- 'pandas.core.frame.DataFrame.to_sparse': ['type(df)'],
- 'pandas.core.frame.DataFrame.value_counts': ['*'],
})
self.assertEqual(result.failed, 0)
@@ -165,6 +218,7 @@ class DoctestTest(unittest.TestCase):
"s.nlargest(3)",
"s.nlargest(3, keep='last')",
],
+ 'pandas.core.series.Series.memory_usage': ['*'],
'pandas.core.series.Series.nsmallest': [
"s.nsmallest()",
"s.nsmallest(3)",
@@ -179,6 +233,9 @@ class DoctestTest(unittest.TestCase):
'pandas.core.series.Series.values': ['*'],
'pandas.core.series.Series.view': ['*'],
},
+ not_implemented_ok={
+ 'pandas.core.series.Series.reindex': ['*'],
+ },
skip={
'pandas.core.series.Series.array': ['*'],
'pandas.core.series.Series.append': ['*'],
@@ -199,12 +256,10 @@ class DoctestTest(unittest.TestCase):
'pandas.core.series.Series.groupby': ['*'],
'pandas.core.series.Series.idxmax': ['*'],
'pandas.core.series.Series.idxmin': ['*'],
- 'pandas.core.series.Series.memory_usage': ['*'],
'pandas.core.series.Series.name': ['*'],
'pandas.core.series.Series.nonzero': ['*'],
'pandas.core.series.Series.pop': ['*'],
'pandas.core.series.Series.quantile': ['*'],
- 'pandas.core.series.Series.reindex': ['*'],
'pandas.core.series.Series.rename': ['*'],
'pandas.core.series.Series.repeat': ['*'],
'pandas.core.series.Series.replace': ['*'],