You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/03/29 18:22:17 UTC

[GitHub] [beam] TheNeuralBit commented on a change in pull request #13843: [BEAM-11628] Add initial support for GroupBy.apply

TheNeuralBit commented on a change in pull request #13843:
URL: https://github.com/apache/beam/pull/13843#discussion_r603514511



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1785,9 +1855,63 @@ def agg(self, fn):
     return DeferredDataFrame(
         expressions.ComputedExpression(
             'agg',
-            lambda df: df.agg(fn), [self._expr],
+            lambda gb: gb.agg(fn), [self._expr],
             requires_partition_by=partitionings.Index(),
-            preserves_partition_by=partitionings.Arbitrary()))
+            preserves_partition_by=partitionings.Singleton()))
+
+  def apply(self, fn, *args, **kwargs):
+    if self._grouping_columns and not self._projection:
+      grouping_columns = self._grouping_columns
+      def fn_wrapper(x, *args, **kwargs):
+        # TODO(BEAM-11710): Moving a column to an index and back is lossy
+        # since indexes dont support as many dtypes. We should keep the original
+        # column in groupby() instead. We need it anyway in case the grouping
+        # column is projected, which is allowed.
+
+        # Move the columns back to columns
+        x = x.assign(**{col: x.index.get_level_values(col)
+                        for col in grouping_columns})
+        x = x.droplevel(grouping_columns)
+        return fn(x, *args, **kwargs)
+    else:
+      fn_wrapper = fn
+
+    project = _maybe_project_func(self._projection)
+
+    # Unfortunately pandas does not execute fn to determine the right proxy.
+    # We run user fn on a proxy here to detect the return type and generate the
+    # proxy.
+    result = fn_wrapper(project(self._ungrouped_with_index.proxy()))
+    if isinstance(result, pd.core.generic.NDFrame):
+      proxy = result[:0]
+
+      def index_to_arrays(index):
+        return [index.get_level_values(level)
+                for level in range(index.nlevels)]
+
+      # The final result will have the grouped indexes + the indexes from the
+      # result
+      proxy.index = pd.MultiIndex.from_arrays(
+          index_to_arrays(self._ungrouped.proxy().index) +
+          index_to_arrays(proxy.index),
+          names=self._ungrouped.proxy().index.names + proxy.index.names)
+    else:
+      dtype = pd.Series([result]).dtype

Review comment:
       It's not just for ease of later processing, it's because the pandas implementation combines these individual scalar results into a Series.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org