You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/10 19:33:21 UTC

[GitHub] [beam] robertwb opened a new pull request #12516: [BEAM-9547] Implement dataframes top, join, merge.

robertwb opened a new pull request #12516:
URL: https://github.com/apache/beam/pull/12516


   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #12516: [BEAM-9547] Implement dataframes top, join, merge.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #12516:
URL: https://github.com/apache/beam/pull/12516#issuecomment-672277371


   > This is going to take me a while to review since I have very little Python experience and even less Panda experience. If you want a more thorough, faster review, I suggest adding someone else. Otherwise I'll make my way through the PR this week.
   
   This week would be fine, if you're up to it. I'm trying to spread the load (and knowledge) a bit around the team. Feel free to ask clarifying questions. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #12516: [BEAM-9547] Implement dataframes top, join, merge.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #12516:
URL: https://github.com/apache/beam/pull/12516#discussion_r473172807



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):

Review comment:
       All of these public methods are proxies for the (extensively documented) methods of the same name in Pandas. 

##########
File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py
##########
@@ -121,6 +130,12 @@ def test_series_tests(self):
             'pandas.core.series.Series.cumsum': ['*'],
             'pandas.core.series.Series.cumprod': ['*'],
             'pandas.core.series.Series.diff': ['*'],
+            'pandas.core.series.Series.nlargest': [
+                "s.nlargest(3, keep='last')"

Review comment:
       Same.

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')
+    per_partition = expressions.ComputedExpression(
+        'nlargest-per-partition',
+        lambda df: df.nlargest(**kwargs), [self._expr],
+        preserves_partition_by=partitionings.Singleton(),
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(

Review comment:
       Technically, it's the Expressions that they wrap that are passed down to workers for execution, but yes, that's the basic idea. 

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -200,9 +236,178 @@ def dropna(self, axis, **kwargs):
   items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
       'non-lazy')
 
+  def _cols_as_temporary_index(self, cols, suffix=''):
+    original_index_names = list(self._expr.proxy().index.names)
+    new_index_names = [
+        '__apache_beam_temp_%d_%s' % (ix, suffix)
+        for (ix, _) in enumerate(original_index_names)]
+    def revert(df):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'join_restoreindex',
+              lambda df:
+                  df.reset_index().set_index(new_index_names)
+                  .rename_axis(index=original_index_names, copy=False),
+              [df._expr],
+              preserves_partition_by=partitionings.Nothing(),
+              requires_partition_by=partitionings.Nothing()))
+    reindexed = frame_base.DeferredFrame.wrap(

Review comment:
       Good question. Only because it was always invoked right away. Updated to be a method for symmetry. 

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')
+    per_partition = expressions.ComputedExpression(
+        'nlargest-per-partition',
+        lambda df: df.nlargest(**kwargs), [self._expr],
+        preserves_partition_by=partitionings.Singleton(),
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'nlargest',
+              lambda df: df.nlargest(**kwargs), [per_partition],
+              preserves_partition_by=partitionings.Singleton(),
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  def nsmallest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')
+    per_partition = expressions.ComputedExpression(
+        'nsmallest-per-partition',
+        lambda df: df.nsmallest(**kwargs), [self._expr],
+        preserves_partition_by=partitionings.Singleton(),
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'nsmallest',
+              lambda df: df.nsmallest(**kwargs), [per_partition],
+              preserves_partition_by=partitionings.Singleton(),
+              requires_partition_by=partitionings.Singleton()))
+
+  rename_axis = frame_base._elementwise_method('rename_axis')

Review comment:
       Given I'm going for full coverage, I tried to keep them in basic alphabetical order, unless there is an obvious relationship between methods. 
   
   As for elementwise_method, basically it creates a method that returns `expressions.ComputedExpression(name, lambda df: getattr(df, name)(args), self._expr, preserves=Singleton(), requires=Nothing)` but also handles a mix of deferred and non-deferred arguments. It was such a common pattern that I made a helper for it. 

##########
File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py
##########
@@ -43,6 +43,13 @@ def test_dataframe_tests(self):
             'pandas.core.frame.DataFrame.itertuples': ['*'],
             'pandas.core.frame.DataFrame.iterrows': ['*'],
             'pandas.core.frame.DataFrame.iteritems': ['*'],
+            'pandas.core.frame.DataFrame.nlargest': [
+                "df.nlargest(3, 'population', keep='last')"

Review comment:
       This is skipping a specific test that exists in the docstring. 

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')

Review comment:
       When there is a tie, a specification like "first" or "last" makes the result depend on the ordering to decide which ones to keep. With "all" we may return more than n occurrences (as in Pandas) but we no longer have to worry about keeping (or discarding) the right ones. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #12516: [BEAM-9547] Implement dataframes top, join, merge.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #12516:
URL: https://github.com/apache/beam/pull/12516#discussion_r468862256



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')
+    per_partition = expressions.ComputedExpression(
+        'nlargest',

Review comment:
       Good point, updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb merged pull request #12516: [BEAM-9547] Implement dataframes top, join, merge.

Posted by GitBox <gi...@apache.org>.
robertwb merged pull request #12516:
URL: https://github.com/apache/beam/pull/12516


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tysonjh commented on a change in pull request #12516: [BEAM-9547] Implement dataframes top, join, merge.

Posted by GitBox <gi...@apache.org>.
tysonjh commented on a change in pull request #12516:
URL: https://github.com/apache/beam/pull/12516#discussion_r470303371



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')
+    per_partition = expressions.ComputedExpression(
+        'nlargest-per-partition',
+        lambda df: df.nlargest(**kwargs), [self._expr],
+        preserves_partition_by=partitionings.Singleton(),
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'nlargest',
+              lambda df: df.nlargest(**kwargs), [per_partition],
+              preserves_partition_by=partitionings.Singleton(),
+              requires_partition_by=partitionings.Singleton()))
+
+  @frame_base.args_to_kwargs(pd.Series)
+  def nsmallest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')
+    per_partition = expressions.ComputedExpression(
+        'nsmallest-per-partition',
+        lambda df: df.nsmallest(**kwargs), [self._expr],
+        preserves_partition_by=partitionings.Singleton(),
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'nsmallest',
+              lambda df: df.nsmallest(**kwargs), [per_partition],
+              preserves_partition_by=partitionings.Singleton(),
+              requires_partition_by=partitionings.Singleton()))
+
+  rename_axis = frame_base._elementwise_method('rename_axis')

Review comment:
       Should this be up with the other elementwise_methods (e.g. `prod`) in the class? Or closer to the call site? I'm just trying to understand why it is here.
   
   Also, I tried to follow along the implementation of `elementwise_method` but got lost at the `_proxy_function`. What does this do?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')

Review comment:
       How does 'keep' relate to the order sensitivity mentioned in the error? Using 'keep' can result in a possibly larger than n Series returned. Shouldn't this be using 'first' to return the first n occurrences?

##########
File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py
##########
@@ -121,6 +130,12 @@ def test_series_tests(self):
             'pandas.core.series.Series.cumsum': ['*'],
             'pandas.core.series.Series.cumprod': ['*'],
             'pandas.core.series.Series.diff': ['*'],
+            'pandas.core.series.Series.nlargest': [
+                "s.nlargest(3, keep='last')"

Review comment:
       'first' here too? 

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')
+    per_partition = expressions.ComputedExpression(
+        'nlargest-per-partition',
+        lambda df: df.nlargest(**kwargs), [self._expr],
+        preserves_partition_by=partitionings.Singleton(),
+        requires_partition_by=partitionings.Nothing())
+    with expressions.allow_non_parallel_operations(True):
+      return frame_base.DeferredFrame.wrap(

Review comment:
       Are `DeferredFrame` expressions that are configured at pipeline construction but passed to workers for execution?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -200,9 +236,178 @@ def dropna(self, axis, **kwargs):
   items = itertuples = iterrows = iteritems = frame_base.wont_implement_method(
       'non-lazy')
 
+  def _cols_as_temporary_index(self, cols, suffix=''):
+    original_index_names = list(self._expr.proxy().index.names)
+    new_index_names = [
+        '__apache_beam_temp_%d_%s' % (ix, suffix)
+        for (ix, _) in enumerate(original_index_names)]
+    def revert(df):
+      return frame_base.DeferredFrame.wrap(
+          expressions.ComputedExpression(
+              'join_restoreindex',
+              lambda df:
+                  df.reset_index().set_index(new_index_names)
+                  .rename_axis(index=original_index_names, copy=False),
+              [df._expr],
+              preserves_partition_by=partitionings.Nothing(),
+              requires_partition_by=partitionings.Nothing()))
+    reindexed = frame_base.DeferredFrame.wrap(

Review comment:
       Why is this not defined as a method like `revert` is above?

##########
File path: sdks/python/apache_beam/dataframe/pandas_doctests_test.py
##########
@@ -43,6 +43,13 @@ def test_dataframe_tests(self):
             'pandas.core.frame.DataFrame.itertuples': ['*'],
             'pandas.core.frame.DataFrame.iterrows': ['*'],
             'pandas.core.frame.DataFrame.iteritems': ['*'],
+            'pandas.core.frame.DataFrame.nlargest': [
+                "df.nlargest(3, 'population', keep='last')"

Review comment:
       keep='first' as well?

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):

Review comment:
       Documentation? The method name itself is pretty self documenting but at least on the parameters? I had to go look them up from pd.Series. Similarly for other public methods in this class, documentation would be helpful and go a long way towards making this more accessible for contributors.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tysonjh commented on a change in pull request #12516: [BEAM-9547] Implement dataframes top, join, merge.

Posted by GitBox <gi...@apache.org>.
tysonjh commented on a change in pull request #12516:
URL: https://github.com/apache/beam/pull/12516#discussion_r468707336



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -54,6 +54,42 @@ def agg(self, *args, **kwargs):
       'order-sensitive')
   diff = frame_base.wont_implement_method('order-sensitive')
 
+  @frame_base.args_to_kwargs(pd.Series)
+  def nlargest(self, **kwargs):
+    if 'keep' in kwargs and kwargs['keep'] != 'all':
+      raise frame_base.WontImplementError('order-sensitive')
+    per_partition = expressions.ComputedExpression(
+        'nlargest',

Review comment:
       Would using 'nlargest' for the name of both the per-partition and global computed expression cause confusion for debugging or UX?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #12516: [BEAM-9547] Implement dataframes top, join, merge.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #12516:
URL: https://github.com/apache/beam/pull/12516#issuecomment-671550524


   R: @tysonjh


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org