You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/06/14 19:07:46 UTC

[GitHub] [beam] kennknowles commented on a change in pull request #14992: [BEAM-9547] Add implementation for first and last

kennknowles commented on a change in pull request #14992:
URL: https://github.com/apache/beam/pull/14992#discussion_r651203658



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3210,17 +3230,15 @@ class _DeferredGroupByCols(frame_base.DeferredFrame):
   diff = frame_base._elementwise_method('diff', base=DataFrameGroupBy)
   fillna = frame_base._elementwise_method('fillna', base=DataFrameGroupBy)
   filter = frame_base._elementwise_method('filter', base=DataFrameGroupBy)
-  first = frame_base.wont_implement_method(
-      DataFrameGroupBy, 'first', reason="order-sensitive")
+  first = frame_base._elementwise_method('first', base=DataFrameGroupBy)

Review comment:
       Putting it here cause it to do a `first` globally?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -253,6 +253,36 @@ def fillna(self, value, method, axis, limit, **kwargs):
   backfill = _fillna_alias('backfill')
   pad = _fillna_alias('pad')
 
+  @frame_base.with_docs_from(pd.DataFrame)
+  def first(self, offset):
+    per_partition = expressions.ComputedExpression(
+        'first-per-partition',
+        lambda df: df.sort_index().first(offset=offset), [self._expr],
+        preserves_partition_by=partitionings.Arbitrary(),
+        requires_partition_by=partitionings.Arbitrary())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'first',
+              lambda df: df.sort_index().first(offset=offset), [per_partition],
+              preserves_partition_by=partitionings.Arbitrary(),
+              requires_partition_by=partitionings.Singleton()))

Review comment:
       This is the bit that causes it to feed in the whole dataframe to the DoFn at once?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -253,6 +253,36 @@ def fillna(self, value, method, axis, limit, **kwargs):
   backfill = _fillna_alias('backfill')
   pad = _fillna_alias('pad')
 
+  @frame_base.with_docs_from(pd.DataFrame)
+  def first(self, offset):
+    per_partition = expressions.ComputedExpression(
+        'first-per-partition',
+        lambda df: df.sort_index().first(offset=offset), [self._expr],
+        preserves_partition_by=partitionings.Arbitrary(),

Review comment:
       Assuming this means that it preserves no partitioning?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -3037,10 +3059,8 @@ def do_partition_apply(df):
   tail = frame_base.wont_implement_method(
       DataFrameGroupBy, 'tail', explanation=_PEEK_METHOD_EXPLANATION)
 
-  first = frame_base.wont_implement_method(
-      DataFrameGroupBy, 'first', reason='order-sensitive')
-  last = frame_base.wont_implement_method(
-      DataFrameGroupBy, 'last', reason='order-sensitive')
+  first = frame_base.not_implemented_method('first')

Review comment:
       What's the difference here? I was looking at adjacent lines to see if there were any analogous functions, to understand at a high level, but I don't see any.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org