You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/14 22:42:19 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #13082: [BEAM-9547] Flesh out dataframe groupby (and related) implementation.

TheNeuralBit commented on a change in pull request #13082:
URL: https://github.com/apache/beam/pull/13082#discussion_r504989018



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -27,12 +28,118 @@
 from apache_beam.dataframe import partitionings
 
 
-@frame_base.DeferredFrame._register_for(pd.Series)
-class DeferredSeries(frame_base.DeferredFrame):
+class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
   def __array__(self, dtype=None):
     raise frame_base.WontImplementError(
         'Conversion to a non-deferred a numpy array.')
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def droplevel(self, level, axis):
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'droplevel',
+            lambda df: df.droplevel(level, axis=axis), [self._expr],
+            requires_partition_by=partitionings.Nothing(),
+            preserves_partition_by=partitionings.Index()
+            if axis in (1, 'column') else partitionings.Nothing()))
+
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def groupby(self, by, level, axis, as_index, group_keys, **kwargs):

Review comment:
       Could you update pandas_doctests_test?  It looks like the skipped DataFrame.groupby tests should pass now, and hopefully the Series.groupby ones as well.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1120,6 +1222,15 @@ def agg(self, fn):
             requires_partition_by=partitionings.Index(),
             preserves_partition_by=partitionings.Singleton()))
 
+  aggregate = agg
+
+  first = last = head = tail = frame_base.not_implemented_method(
+      'order sensitive')
+
+  __len__ = frame_base.wont_implement_method('non-deferred')

Review comment:
       Should we consider implementing this and `groups` for categorical grouping keys?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -27,12 +28,118 @@
 from apache_beam.dataframe import partitionings
 
 
-@frame_base.DeferredFrame._register_for(pd.Series)
-class DeferredSeries(frame_base.DeferredFrame):
+class DeferredDataFrameOrSeries(frame_base.DeferredFrame):
   def __array__(self, dtype=None):
     raise frame_base.WontImplementError(
         'Conversion to a non-deferred a numpy array.')
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def droplevel(self, level, axis):
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'droplevel',
+            lambda df: df.droplevel(level, axis=axis), [self._expr],
+            requires_partition_by=partitionings.Nothing(),
+            preserves_partition_by=partitionings.Index()
+            if axis in (1, 'column') else partitionings.Nothing()))
+
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def groupby(self, by, level, axis, as_index, group_keys, **kwargs):
+    if not as_index:
+      raise NotImplementedError('groupby(as_index=False)')
+    if not group_keys:
+      raise NotImplementedError('groupby(group_keys=False)')
+
+    if axis in (1, 'columns'):
+      return _DeferredGroupByCols(
+          expressions.ComputedExpression(
+              'groupbycols',
+              lambda df: df.groupby(by, axis=axis, **kwargs), [self._expr],
+              requires_partition_by=partitionings.Nothing(),
+              preserves_partition_by=partitionings.Index()))
+
+    if level is None and by is None:
+      raise TypeError("You have to supply one of 'by' and 'level'")
+
+    elif level is not None:
+      if isinstance(level, (list, tuple)):
+        levels = level
+      else:
+        levels = [level]
+      all_levels = self._expr.proxy().index.names
+      levels = [all_levels[i] if isinstance(i, int) else i for i in levels]
+      levels_to_drop = self._expr.proxy().index.names.difference(levels)
+      if levels_to_drop:
+        to_group = self.droplevel(levels_to_drop)._expr
+      else:
+        to_group = self._expr
+
+    elif callable(by):
+
+      def map_index(df):
+        df = df.copy()
+        df.index = df.index.map(by)
+        return df
+
+      to_group = expressions.ComputedExpression(
+          'map_index',
+          map_index, [self._expr],
+          requires_partition_by=partitionings.Nothing(),
+          preserves_partition_by=partitionings.Singleton())
+
+    elif isinstance(by, DeferredSeries):
+
+      if isinstance(self, DeferredSeries):
+
+        def set_index(s, by):
+          df = pd.DataFrame(s)
+          df, by = df.align(by, axis=0)
+          return df.set_index(by).iloc[:, 0]
+      else:
+
+        def set_index(df, by):
+          df, by = df.align(by, axis=0)
+          return df.set_index(by)
+
+      to_group = expressions.ComputedExpression(
+          'set_index',
+          set_index,  #
+          [self._expr, by._expr],
+          requires_partition_by=partitionings.Index(),
+          preserves_partition_by=partitionings.Singleton())
+
+    elif isinstance(by, np.ndarray):
+      raise frame_base.WontImplementError('order sensitive')
+
+    else:
+      if not isinstance(by, list):

Review comment:
       I was going to suggest you allow tuple here, but after looking into it I realized the pandas groupby intentionally does the same thing, since a tuple could be a field name. TIL
   
   https://github.com/pandas-dev/pandas/blob/d7a5b838d8d6234f6bec5a30bfa33b24bd4afbd9/pandas/core/groupby/grouper.py#L713-L718




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org