You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/19 20:06:50 UTC

[GitHub] [beam] TheNeuralBit opened a new pull request #13139: [BEAM-9547] Implementation for drop, explode

TheNeuralBit opened a new pull request #13139:
URL: https://github.com/apache/beam/pull/13139


   This PR primarily adds support for `DataFrame.{drop,explode}`.
   
   I also noticed an edge case affecting the generated proxy in `rename`, so I included a change in this PR to make `TransformTest.run_scenario` verify that the deferred DataFrame's proxy matches the shape of the expected output. This detected a couple of issues (the aforementioned rename issue, verified in a new test, as well as a problem in `str.repeat`).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #13139:
URL: https://github.com/apache/beam/pull/13139#discussion_r513682982



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -619,21 +619,75 @@ def assign(self, **kwargs):
                                             "instances are supported.")
     return frame_base._elementwise_method('assign')(self, **kwargs)
 
-
   apply = frame_base.not_implemented_method('apply')
-  explode = frame_base.not_implemented_method('explode')
   isin = frame_base.not_implemented_method('isin')
   append = frame_base.not_implemented_method('append')
   combine = frame_base.not_implemented_method('combine')
   combine_first = frame_base.not_implemented_method('combine_first')
   count = frame_base.not_implemented_method('count')
-  drop = frame_base.not_implemented_method('drop')
   eval = frame_base.not_implemented_method('eval')
   reindex = frame_base.not_implemented_method('reindex')
   melt = frame_base.not_implemented_method('melt')
   pivot = frame_base.not_implemented_method('pivot')
   pivot_table = frame_base.not_implemented_method('pivot_table')
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def explode(self, column, ignore_index):
+    # ignoring the index will not preserve it
+    preserves = (partitionings.Nothing() if ignore_index
+                 else partitionings.Singleton())
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'explode',
+            lambda df: df.explode(column, ignore_index),
+            [self._expr],
+            preserves_partition_by=preserves,
+            requires_partition_by=partitionings.Nothing()))
+
+
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  @frame_base.maybe_inplace
+  def drop(self, **kwargs):
+    labels = kwargs.get('labels', None)

Review comment:
       One danger here of using kwargs.get rather than letting it be a parameter is that you're hard-coding what all the defaults are (rather than using populate_defaults). 

##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1513,6 +1584,8 @@ def repeat(self, repeats):
               'repeat',
               lambda series: series.str.repeat(repeats),
               [self._expr],
+              # Output will also be a str Series

Review comment:
       Is there a drawback to this being automatically inferred? (Or was it not?)
   
   Same below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13139:
URL: https://github.com/apache/beam/pull/13139#discussion_r513711027



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1513,6 +1584,8 @@ def repeat(self, repeats):
               'repeat',
               lambda series: series.str.repeat(repeats),
               [self._expr],
+              # Output will also be a str Series

Review comment:
       Yeah when I added the verification of the proxy in `TransformTest` I found that this proxy was incorrectly inferred as `bool` for the zipping case tested there.
   
   ```py
   In [10]: proxy.dtypes
   Out[10]: 
   str        object
   repeats     int64
   dtype: object
   
   In [11]: proxy.str.str.repeat(proxy.repeats)
   Out[11]: Series([], Name: str, dtype: bool)
   ```
   
   The actual operation does produce `object` though:
   ```py
   In [13]: df.str.str.repeat(df.repeats)
   Out[13]: 
   0      AAA
   1        B
   2     CCCC
   3    DDDDD
   4       EE
   Name: str, dtype: object
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13139: [BEAM-9547] Implementation for drop, explode

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13139:
URL: https://github.com/apache/beam/pull/13139#issuecomment-717598637


   R: @robertwb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13139: [BEAM-9547] Implementation for drop, explode

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13139:
URL: https://github.com/apache/beam/pull/13139#issuecomment-719060222


   Windows failures look like BEAM-10921


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13139:
URL: https://github.com/apache/beam/pull/13139#discussion_r513799533



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -619,21 +619,75 @@ def assign(self, **kwargs):
                                             "instances are supported.")
     return frame_base._elementwise_method('assign')(self, **kwargs)
 
-
   apply = frame_base.not_implemented_method('apply')
-  explode = frame_base.not_implemented_method('explode')
   isin = frame_base.not_implemented_method('isin')
   append = frame_base.not_implemented_method('append')
   combine = frame_base.not_implemented_method('combine')
   combine_first = frame_base.not_implemented_method('combine_first')
   count = frame_base.not_implemented_method('count')
-  drop = frame_base.not_implemented_method('drop')
   eval = frame_base.not_implemented_method('eval')
   reindex = frame_base.not_implemented_method('reindex')
   melt = frame_base.not_implemented_method('melt')
   pivot = frame_base.not_implemented_method('pivot')
   pivot_table = frame_base.not_implemented_method('pivot_table')
 
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  def explode(self, column, ignore_index):
+    # ignoring the index will not preserve it
+    preserves = (partitionings.Nothing() if ignore_index
+                 else partitionings.Singleton())
+    return frame_base.DeferredFrame.wrap(
+        expressions.ComputedExpression(
+            'explode',
+            lambda df: df.explode(column, ignore_index),
+            [self._expr],
+            preserves_partition_by=preserves,
+            requires_partition_by=partitionings.Nothing()))
+
+
+  @frame_base.args_to_kwargs(pd.DataFrame)
+  @frame_base.populate_defaults(pd.DataFrame)
+  @frame_base.maybe_inplace
+  def drop(self, **kwargs):
+    labels = kwargs.get('labels', None)

Review comment:
       Ah whoops. I switched it over to use args directly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit merged pull request #13139: [BEAM-9547] Implementation for drop, explode

Posted by GitBox <gi...@apache.org>.
TheNeuralBit merged pull request #13139:
URL: https://github.com/apache/beam/pull/13139


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #13139:
URL: https://github.com/apache/beam/pull/13139#discussion_r514435824



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1513,6 +1584,8 @@ def repeat(self, repeats):
               'repeat',
               lambda series: series.str.repeat(repeats),
               [self._expr],
+              # Output will also be a str Series

Review comment:
       Hmm... OK, can you file a JIRA to investigate? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #13139: [BEAM-9547] Implementation for drop, explode

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #13139:
URL: https://github.com/apache/beam/pull/13139#issuecomment-719052448


   Run Python_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #13139: [BEAM-9547] Implementation for drop, explode

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #13139:
URL: https://github.com/apache/beam/pull/13139#discussion_r513711378



##########
File path: sdks/python/apache_beam/dataframe/frames.py
##########
@@ -1513,6 +1584,8 @@ def repeat(self, repeats):
               'repeat',
               lambda series: series.str.repeat(repeats),
               [self._expr],
+              # Output will also be a str Series

Review comment:
       Maybe something we should fix upstream and patch around here in the meantime?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org