You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bh...@apache.org on 2020/09/23 22:24:57 UTC

[beam] branch master updated: [BEAM-9547] Roll forward #12858 (#12920)

This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d67200  [BEAM-9547] Roll forward #12858 (#12920)
3d67200 is described below

commit 3d67200af0d7faf968682ac85b196699195905e1
Author: Brian Hulette <bh...@google.com>
AuthorDate: Wed Sep 23 15:24:32 2020 -0700

    [BEAM-9547] Roll forward #12858 (#12920)
    
    * [BEAM-9547] Raise NotImplementedError and WontImplementError throughout DeferredDataframe (#12858)
    
    * Draft of NotImplementedErrors for DataFrame
    
    * Default to jira BEAM-9547
    
    * pivot is NotImplementedError
    
    * remove mistaken change to frames
    
    * Don't use __ror__ to convert df to PCollection, it's already a DataFrame operator
---
 sdks/python/apache_beam/dataframe/frame_base.py    |   7 +
 sdks/python/apache_beam/dataframe/frames.py        | 118 +++++++++++++----
 sdks/python/apache_beam/dataframe/io.py            |  21 ++-
 .../apache_beam/dataframe/pandas_doctests_test.py  | 145 ++++++++++++++-------
 4 files changed, 207 insertions(+), 84 deletions(-)

diff --git a/sdks/python/apache_beam/dataframe/frame_base.py b/sdks/python/apache_beam/dataframe/frame_base.py
index 4a37f74..2d1f2a0 100644
--- a/sdks/python/apache_beam/dataframe/frame_base.py
+++ b/sdks/python/apache_beam/dataframe/frame_base.py
@@ -251,6 +251,13 @@ def wont_implement_method(msg):
   return wrapper
 
 
+def not_implemented_method(op, jira='BEAM-9547'):
+  def wrapper(self, *args, **kwargs):
+    raise NotImplementedError("'%s' is not yet supported (%s)" % (op, jira))
+
+  return wrapper
+
+
 def copy_and_mutate(func):
   def wrapper(self, *args, **kwargs):
     copy = self.copy()
diff --git a/sdks/python/apache_beam/dataframe/frames.py b/sdks/python/apache_beam/dataframe/frames.py
index 5b1dd6d..88b4b79 100644
--- a/sdks/python/apache_beam/dataframe/frames.py
+++ b/sdks/python/apache_beam/dataframe/frames.py
@@ -70,6 +70,8 @@ class DeferredSeries(frame_base.DeferredFrame):
             preserves_partition_by=partitionings.Singleton(),
             requires_partition_by=partitionings.Nothing()))
 
+  reindex = frame_base.not_implemented_method('reindex')
+
   to_numpy = to_string = frame_base.wont_implement_method('non-deferred value')
 
   transform = frame_base._elementwise_method(
@@ -117,6 +119,11 @@ class DeferredSeries(frame_base.DeferredFrame):
 
   head = tail = frame_base.wont_implement_method('order-sensitive')
 
+  memory_usage = frame_base.wont_implement_method('non-deferred value')
+
+  # In Series __contains__ checks the index
+  __contains__ = frame_base.wont_implement_method('non-deferred value')
+
   @frame_base.args_to_kwargs(pd.Series)
   @frame_base.populate_defaults(pd.Series)
   def nlargest(self, keep, **kwargs):
@@ -226,29 +233,6 @@ class DeferredSeries(frame_base.DeferredFrame):
     return _DeferredStringMethods(expr)
 
 
-for base in ['add',
-             'sub',
-             'mul',
-             'div',
-             'truediv',
-             'floordiv',
-             'mod',
-             'pow',
-             'and',
-             'or']:
-  for p in ['%s', 'r%s', '__%s__', '__r%s__']:
-    # TODO: non-trivial level?
-    name = p % base
-    setattr(
-        DeferredSeries,
-        name,
-        frame_base._elementwise_method(name, restrictions={'level': None}))
-  setattr(
-      DeferredSeries,
-      '__i%s__' % base,
-      frame_base._elementwise_method('__i%s__' % base, inplace=True))
-for name in ['__lt__', '__le__', '__gt__', '__ge__', '__eq__', '__ne__']:
-  setattr(DeferredSeries, name, frame_base._elementwise_method(name))
 for name in ['apply', 'map', 'transform']:
   setattr(DeferredSeries, name, frame_base._elementwise_method(name))
 
@@ -259,6 +243,10 @@ class DeferredDataFrame(frame_base.DeferredFrame):
   def T(self):
     return self.transpose()
 
+  @property
+  def columns(self):
+    return self._expr.proxy().columns
+
   def groupby(self, by):
     # TODO: what happens to the existing index?
     # We set the columns to index as we have a notion of being partitioned by
@@ -280,6 +268,13 @@ class DeferredDataFrame(frame_base.DeferredFrame):
 
   def __getitem__(self, key):
     # TODO: Replicate pd.DataFrame.__getitem__ logic
+    if isinstance(key, frame_base.DeferredBase):
+      # Fail early if key is a DeferredBase as it interacts surprisingly with
+      # key in self._expr.proxy().columns
+      raise NotImplementedError(
+          "Indexing with a deferred frame is not yet supported. Consider "
+          "using df.loc[...]")
+
     if (isinstance(key, list) and
         all(key_column in self._expr.proxy().columns
             for key_column in key)) or key in self._expr.proxy().columns:
@@ -287,6 +282,10 @@ class DeferredDataFrame(frame_base.DeferredFrame):
     else:
       raise NotImplementedError(key)
 
+  def __contains__(self, key):
+    # Checks if proxy has the given column
+    return self._expr.proxy().__contains__(key)
+
   def __setitem__(self, key, value):
     if isinstance(key, str):
       # yapf: disable
@@ -314,13 +313,37 @@ class DeferredDataFrame(frame_base.DeferredFrame):
           requires_partition_by=partitionings.Nothing(),
           preserves_partition_by=partitionings.Nothing()))
 
-  def at(self, *args, **kwargs):
-    raise NotImplementedError()
+  at = frame_base.not_implemented_method('at')
 
   @property
   def loc(self):
     return _DeferredLoc(self)
 
+  _get_index = _set_index = frame_base.not_implemented_method('index')
+  index = property(_get_index, _set_index)
+
+  @property
+  def axes(self):
+    return (self.index, self.columns)
+
+  apply = frame_base.not_implemented_method('apply')
+  explode = frame_base.not_implemented_method('explode')
+  isin = frame_base.not_implemented_method('isin')
+  assign = frame_base.not_implemented_method('assign')
+  append = frame_base.not_implemented_method('append')
+  combine = frame_base.not_implemented_method('combine')
+  combine_first = frame_base.not_implemented_method('combine_first')
+  cov = frame_base.not_implemented_method('cov')
+  corr = frame_base.not_implemented_method('corr')
+  count = frame_base.not_implemented_method('count')
+  dot = frame_base.not_implemented_method('dot')
+  drop = frame_base.not_implemented_method('drop')
+  eval = frame_base.not_implemented_method('eval')
+  reindex = frame_base.not_implemented_method('reindex')
+  melt = frame_base.not_implemented_method('melt')
+  pivot = frame_base.not_implemented_method('pivot')
+  pivot_table = frame_base.not_implemented_method('pivot_table')
+
   def aggregate(self, func, axis=0, *args, **kwargs):
     if axis is None:
       # Aggregate across all elements by first aggregating across columns,
@@ -383,6 +406,7 @@ class DeferredDataFrame(frame_base.DeferredFrame):
   applymap = frame_base._elementwise_method('applymap')
 
   memory_usage = frame_base.wont_implement_method('non-deferred value')
+  info = frame_base.wont_implement_method('non-deferred value')
 
   all = frame_base._agg_method('all')
   any = frame_base._agg_method('any')
@@ -398,6 +422,8 @@ class DeferredDataFrame(frame_base.DeferredFrame):
 
   def mode(self, axis=0, *args, **kwargs):
     if axis == 1 or axis == 'columns':
+      # Number of columns is max(number mode values for each row), so we can't
+      # determine how many there will be before looking at the data.
       raise frame_base.WontImplementError('non-deferred column values')
     return frame_base.DeferredFrame.wrap(
         expressions.ComputedExpression(
@@ -766,8 +792,7 @@ class DeferredDataFrame(frame_base.DeferredFrame):
   transform = frame_base._elementwise_method(
       'transform', restrictions={'axis': 0})
 
-  def transpose(self, *args, **kwargs):
-    raise frame_base.WontImplementError('non-deferred column values')
+  transpose = frame_base.wont_implement_method('non-deferred column values')
 
   def unstack(self, *args, **kwargs):
     if self._expr.proxy().index.nlevels == 1:
@@ -799,7 +824,10 @@ for meth in ('filter', ):
 class DeferredGroupBy(frame_base.DeferredFrame):
   def agg(self, fn):
     if not callable(fn):
-      raise NotImplementedError(fn)
+      # TODO: Add support for strings in (UN)LIFTABLE_AGGREGATIONS. Test by
+      # running doctests for pandas.core.groupby.generic
+      raise NotImplementedError('GroupBy.agg currently only supports callable '
+                                'arguments')
     return DeferredDataFrame(
         expressions.ComputedExpression(
             'agg',
@@ -963,3 +991,37 @@ for method in ELEMENTWISE_STRING_METHODS:
   setattr(_DeferredStringMethods,
           method,
           frame_base._elementwise_method(method))
+
+for base in ['add',
+             'sub',
+             'mul',
+             'div',
+             'truediv',
+             'floordiv',
+             'mod',
+             'pow',
+             'and',
+             'or']:
+  for p in ['%s', 'r%s', '__%s__', '__r%s__']:
+    # TODO: non-trivial level?
+    name = p % base
+    setattr(
+        DeferredSeries,
+        name,
+        frame_base._elementwise_method(name, restrictions={'level': None}))
+    setattr(
+        DeferredDataFrame,
+        name,
+        frame_base._elementwise_method(name, restrictions={'level': None}))
+  setattr(
+      DeferredSeries,
+      '__i%s__' % base,
+      frame_base._elementwise_method('__i%s__' % base, inplace=True))
+  setattr(
+      DeferredDataFrame,
+      '__i%s__' % base,
+      frame_base._elementwise_method('__i%s__' % base, inplace=True))
+
+for name in ['__lt__', '__le__', '__gt__', '__ge__', '__eq__', '__ne__']:
+  setattr(DeferredSeries, name, frame_base._elementwise_method(name))
+  setattr(DeferredDataFrame, name, frame_base._elementwise_method(name))
diff --git a/sdks/python/apache_beam/dataframe/io.py b/sdks/python/apache_beam/dataframe/io.py
index 55c549a..2fc944d 100644
--- a/sdks/python/apache_beam/dataframe/io.py
+++ b/sdks/python/apache_beam/dataframe/io.py
@@ -40,8 +40,14 @@ def read_csv(path, *args, **kwargs):
   return _ReadFromPandas(pd.read_csv, path, args, kwargs, incremental=True)
 
 
+def _as_pc(df):
+  from apache_beam.dataframe import convert  # avoid circular import
+  # TODO(roberwb): Amortize the computation for multiple writes?
+  return convert.to_pcollection(df, yield_elements='pandas')
+
+
 def to_csv(df, path, *args, **kwargs):
-  return df | _WriteToPandas(
+  return _as_pc(df) | _WriteToPandas(
       pd.DataFrame.to_csv, path, args, kwargs, incremental=True, binary=False)
 
 
@@ -68,7 +74,7 @@ def to_json(df, path, orient=None, *args, **kwargs):
     else:
       raise frame_base.WontImplementError('not dataframes or series')
   kwargs['orient'] = orient
-  return df | _WriteToPandas(
+  return _as_pc(df) | _WriteToPandas(
       pd.DataFrame.to_json,
       path,
       args,
@@ -87,7 +93,7 @@ def read_html(path, *args, **kwargs):
 
 
 def to_html(df, path, *args, **kwargs):
-  return df | _WriteToPandas(
+  return _as_pc(df) | _WriteToPandas(
       pd.DataFrame.to_html,
       path,
       args,
@@ -109,7 +115,7 @@ def _binary_writer(format):
       lambda df,
       path,
       *args,
-      **kwargs: df | _WriteToPandas(func, path, args, kwargs))
+      **kwargs: _as_pc(df) | _WriteToPandas(func, path, args, kwargs))
 
 
 for format in ('excel', 'feather', 'parquet', 'stata'):
@@ -214,13 +220,6 @@ class _WriteToPandas(beam.PTransform):
     self.incremental = incremental
     self.binary = binary
 
-  def __ror__(self, other, label=None):
-    if isinstance(other, frame_base.DeferredBase):
-      from apache_beam.dataframe import convert  # avoid circular import
-      # TODO(roberwb): Amortize the computation for multiple writes?
-      other = convert.to_pcollection(other, yield_elements='pandas')
-    return super(_WriteToPandas, self).__ror__(other, label)
-
   def expand(self, pcoll):
     dir, name = io.filesystems.FileSystems.split(self.path)
     return pcoll | fileio.WriteToFiles(
diff --git a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
index 432a016..abc2b7d 100644
--- a/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
+++ b/sdks/python/apache_beam/dataframe/pandas_doctests_test.py
@@ -68,32 +68,65 @@ class DoctestTest(unittest.TestCase):
             ],
             'pandas.core.frame.DataFrame.unstack': ['*'],
             'pandas.core.frame.DataFrame.memory_usage': ['*'],
-        },
-        skip={
-            'pandas.core.frame.DataFrame.T': [
-                'df1_transposed.dtypes', 'df2_transposed.dtypes'
+            'pandas.core.frame.DataFrame.info': ['*'],
+            # Not equal to df.agg('mode', axis='columns', numeric_only=True)
+            # because there can be multiple columns if a row has more than one
+            # mode
+            'pandas.core.frame.DataFrame.mode': [
+                "df.mode(axis='columns', numeric_only=True)"
             ],
-            'pandas.core.frame.DataFrame.agg': ['*'],
-            'pandas.core.frame.DataFrame.aggregate': ['*'],
-            'pandas.core.frame.DataFrame.append': ['*'],
-            'pandas.core.frame.DataFrame.apply': ['*'],
-            'pandas.core.frame.DataFrame.applymap': ['df ** 2'],
-            'pandas.core.frame.DataFrame.assign': ['*'],
+        },
+        not_implemented_ok={
+            'pandas.core.frame.DataFrame.isin': ['*'],
+            'pandas.core.frame.DataFrame.melt': ['*'],
             'pandas.core.frame.DataFrame.axes': ['*'],
+            'pandas.core.frame.DataFrame.count': ['*'],
+            'pandas.core.frame.DataFrame.reindex': ['*'],
+            'pandas.core.frame.DataFrame.reindex_axis': ['*'],
+
+            # We should be able to support pivot and pivot_table for categorical
+            # columns
+            'pandas.core.frame.DataFrame.pivot': ['*'],
+
+            # DataFrame.__getitem__ cannot be used as loc
+            'pandas.core.frame.DataFrame.query': [
+                'df[df.A > df.B]', "df[df.B == df['C C']]"
+            ],
+
+            # We can implement this as a zipping operator, but it won't have the
+            # same capability. The doctest includes an example that branches on
+            # a deferred result.
             'pandas.core.frame.DataFrame.combine': ['*'],
+
+            # Can be implemented as a zipping operator
             'pandas.core.frame.DataFrame.combine_first': ['*'],
-            'pandas.core.frame.DataFrame.compare': ['*'],
+
+            # Difficult to parallelize but should be possible?
             'pandas.core.frame.DataFrame.corr': ['*'],
-            'pandas.core.frame.DataFrame.count': ['*'],
             'pandas.core.frame.DataFrame.cov': ['*'],
             'pandas.core.frame.DataFrame.dot': ['*'],
+
+            # element-wise
+            'pandas.core.frame.DataFrame.eval': ['*'],
+            'pandas.core.frame.DataFrame.explode': ['*'],
+
+            # Trivially elementwise for axis=columns. Relies on global indexing
+            # for axis=rows.
             'pandas.core.frame.DataFrame.drop': ['*'],
+            'pandas.core.frame.DataFrame.rename': ['*'],
+            'pandas.core.frame.DataFrame.apply': ['*'],
+
+            # Zipping operation if input is a DeferredSeries
+            'pandas.core.frame.DataFrame.assign': ['*'],
+
+            # In theory this is possible for bounded inputs?
+            'pandas.core.frame.DataFrame.append': ['*'],
+        },
+        skip={
+            'pandas.core.frame.DataFrame.compare': ['*'],
             'pandas.core.frame.DataFrame.drop_duplicates': ['*'],
             'pandas.core.frame.DataFrame.duplicated': ['*'],
-            'pandas.core.frame.DataFrame.eval': ['*'],
-            'pandas.core.frame.DataFrame.explode': ['*'],
             'pandas.core.frame.DataFrame.groupby': [
-                # More keyword arguments.
                 'df.groupby(level=0).mean()',
                 'df.groupby(level="Type").mean()',
                 'df.groupby(by=["b"], dropna=False).sum()',
@@ -101,47 +134,67 @@ class DoctestTest(unittest.TestCase):
             ],
             'pandas.core.frame.DataFrame.idxmax': ['*'],
             'pandas.core.frame.DataFrame.idxmin': ['*'],
-            'pandas.core.frame.DataFrame.info': ['*'],
-            'pandas.core.frame.DataFrame.isin': ['*'],
-            'pandas.core.frame.DataFrame.iterrows': ["print(df['int'].dtype)"],
-            'pandas.core.frame.DataFrame.melt': ['*'],
-            'pandas.core.frame.DataFrame.memory_usage': ['*'],
+            'pandas.core.frame.DataFrame.pop': ['*'],
+            'pandas.core.frame.DataFrame.set_axis': ['*'],
+            'pandas.core.frame.DataFrame.sort_index': ['*'],
+            'pandas.core.frame.DataFrame.to_markdown': ['*'],
+            'pandas.core.frame.DataFrame.to_parquet': ['*'],
+            'pandas.core.frame.DataFrame.value_counts': ['*'],
+
+            'pandas.core.frame.DataFrame.to_records': [
+                'df.index = df.index.rename("I")',
+                'index_dtypes = f"<S{df.index.str.len().max()}"', # 1.x
+                'index_dtypes = "<S{}".format(df.index.str.len().max())', #0.x
+                'df.to_records(index_dtypes=index_dtypes)',
+            ],
+            # These tests use the static method pd.pivot_table, which doesn't
+            # actually raise NotImplementedError
+            'pandas.core.frame.DataFrame.pivot_table': ['*'],
+            # Expected to raise a ValueError, but we raise NotImplementedError
+            'pandas.core.frame.DataFrame.pivot': [
+                "df.pivot(index='foo', columns='bar', values='baz')"
+            ],
+            'pandas.core.frame.DataFrame.append': [
+                'df',
+                # pylint: disable=line-too-long
+                "pd.concat([pd.DataFrame([i], columns=['A']) for i in range(5)],\n"
+                "          ignore_index=True)"
+            ],
+            'pandas.core.frame.DataFrame.eval': ['df'],
+            # No override for __matmul__ and friends
+            'pandas.core.frame.DataFrame.dot': ['df @ other'],
+            'pandas.core.frame.DataFrame.melt': [
+                "df.columns = [list('ABC'), list('DEF')]", "df"
+            ],
             'pandas.core.frame.DataFrame.merge': [
                 # Order-sensitive index, checked in frames_test.py.
                 "df1.merge(df2, left_on='lkey', right_on='rkey')",
                 "df1.merge(df2, left_on='lkey', right_on='rkey',\n"
                 "          suffixes=('_left', '_right'))",
             ],
-            # Not equal to df.agg('mode', axis='columns', numeric_only=True)
-            'pandas.core.frame.DataFrame.mode': [
-                "df.mode(axis='columns', numeric_only=True)"
-            ],
-            'pandas.core.frame.DataFrame.pivot': ['*'],
-            'pandas.core.frame.DataFrame.pivot_table': ['*'],
-            'pandas.core.frame.DataFrame.pop': ['*'],
-            'pandas.core.frame.DataFrame.query': ['*'],
-            'pandas.core.frame.DataFrame.reindex': ['*'],
-            # Sets df.index
-            'pandas.core.frame.DataFrame.reindex_axis': ['*'],
-            'pandas.core.frame.DataFrame.rename': ['*'],
             # Raises right exception, but testing framework has matching issues.
             'pandas.core.frame.DataFrame.replace': [
                 "df.replace({'a string': 'new value', True: False})  # raises"
             ],
-            # Uses unseeded np.random.
-            'pandas.core.frame.DataFrame.round': ['*'],
-            'pandas.core.frame.DataFrame.set_axis': ['*'],
-            'pandas.core.frame.DataFrame.set_index': ['*'],
-            'pandas.core.frame.DataFrame.sort_index': ['*'],
+            # Should raise WontImplement order-sensitive
+            'pandas.core.frame.DataFrame.set_index': [
+                "df.set_index([pd.Index([1, 2, 3, 4]), 'year'])",
+                "df.set_index([s, s**2])",
+            ],
+            'pandas.core.frame.DataFrame.to_sparse': ['type(df)'],
+
+            # DeferredSeries has no attribute dtype. Should we allow this and
+            # defer to proxy?
+            'pandas.core.frame.DataFrame.iterrows': ["print(df['int'].dtype)"],
+
+            # Skipped because "seen_wont_implement" is reset before getting to
+            # these calls, so the NameError they raise is not ignored.
+            'pandas.core.frame.DataFrame.T': [
+                'df1_transposed.dtypes', 'df2_transposed.dtypes'
+            ],
             'pandas.core.frame.DataFrame.transpose': [
                 'df1_transposed.dtypes', 'df2_transposed.dtypes'
             ],
-            'pandas.core.frame.DataFrame.to_markdown': ['*'],
-            'pandas.core.frame.DataFrame.to_parquet': ['*'],
-            # Uses df.index
-            'pandas.core.frame.DataFrame.to_records': ['*'],
-            'pandas.core.frame.DataFrame.to_sparse': ['type(df)'],
-            'pandas.core.frame.DataFrame.value_counts': ['*'],
         })
     self.assertEqual(result.failed, 0)
 
@@ -165,6 +218,7 @@ class DoctestTest(unittest.TestCase):
                 "s.nlargest(3)",
                 "s.nlargest(3, keep='last')",
             ],
+            'pandas.core.series.Series.memory_usage': ['*'],
             'pandas.core.series.Series.nsmallest': [
                 "s.nsmallest()",
                 "s.nsmallest(3)",
@@ -179,6 +233,9 @@ class DoctestTest(unittest.TestCase):
             'pandas.core.series.Series.values': ['*'],
             'pandas.core.series.Series.view': ['*'],
         },
+        not_implemented_ok={
+            'pandas.core.series.Series.reindex': ['*'],
+        },
         skip={
             'pandas.core.series.Series.array': ['*'],
             'pandas.core.series.Series.append': ['*'],
@@ -199,12 +256,10 @@ class DoctestTest(unittest.TestCase):
             'pandas.core.series.Series.groupby': ['*'],
             'pandas.core.series.Series.idxmax': ['*'],
             'pandas.core.series.Series.idxmin': ['*'],
-            'pandas.core.series.Series.memory_usage': ['*'],
             'pandas.core.series.Series.name': ['*'],
             'pandas.core.series.Series.nonzero': ['*'],
             'pandas.core.series.Series.pop': ['*'],
             'pandas.core.series.Series.quantile': ['*'],
-            'pandas.core.series.Series.reindex': ['*'],
             'pandas.core.series.Series.rename': ['*'],
             'pandas.core.series.Series.repeat': ['*'],
             'pandas.core.series.Series.replace': ['*'],