You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/09/29 23:38:25 UTC

[GitHub] [beam] robertwb opened a new pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

robertwb opened a new pull request #12971:
URL: https://github.com/apache/beam/pull/12971


   Sizes are directly measured in preceeding stages and estimated from there to avoid expensive fusion breaks due to using the global size as a side input.
   
   A cross-partition re-batching step is added to mitigate the performance penalty of possible over-partitioning.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12971:
URL: https://github.com/apache/beam/pull/12971#issuecomment-701054283


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=h1) Report
   > Merging [#12971](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **increase** coverage by `0.04%`.
   > The diff coverage is `92.50%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12971/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12971      +/-   ##
   ==========================================
   + Coverage   82.51%   82.55%   +0.04%     
   ==========================================
     Files         455      453       -2     
     Lines       54867    54772      -95     
   ==========================================
   - Hits        45272    45216      -56     
   + Misses       9595     9556      -39     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `88.46% <91.66%> (-0.58%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.54% <92.64%> (-1.00%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/tests/bigquery\_matcher.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL3Rlc3RzL2JpZ3F1ZXJ5X21hdGNoZXIucHk=) | `79.83% <0.00%> (-2.87%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.68% <0.00%> (-0.84%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/retry.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvcmV0cnkucHk=) | `86.27% <0.00%> (-0.77%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `93.38% <0.00%> (-0.74%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `90.77% <0.00%> (-0.53%)` | :arrow_down: |
   | [sdks/python/apache\_beam/metrics/metricbase.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWV0cmljcy9tZXRyaWNiYXNlLnB5) | `87.87% <0.00%> (-0.36%)` | :arrow_down: |
   | [...python/apache\_beam/runners/worker/worker\_status.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvd29ya2VyX3N0YXR1cy5weQ==) | `77.77% <0.00%> (-0.23%)` | :arrow_down: |
   | ... and [21 more](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=footer). Last update [6bf56f9...ec7c344](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12971:
URL: https://github.com/apache/beam/pull/12971#discussion_r501970588



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):

Review comment:
       Discussed this offline. I was misunderstanding the code, I thought this was summing the memory usage across _partition keys_, but `for tag, parts` is actually aggregating across the tagged inputs. The partition keys are ignored (the `_` on line 424), because they're no longer needed. At this point they've served their purpose of distributing data across workers, so its fine to merge across them.
   
   It may make sense to set the target size to `TARGET_PARTITION_SIZE*num_inputs`, but I'm fine without it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12971:
URL: https://github.com/apache/beam/pull/12971#issuecomment-701054283


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=h1) Report
   > Merging [#12971](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/cee7388365970774969e63fe90fdf3cdd804e79f?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `92.50%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12971/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12971      +/-   ##
   ==========================================
   - Coverage   82.34%   82.34%   -0.01%     
   ==========================================
     Files         455      455              
     Lines       54650    54718      +68     
   ==========================================
   + Hits        45002    45056      +54     
   - Misses       9648     9662      +14     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `88.46% <91.66%> (-0.58%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.54% <92.64%> (-1.00%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.68% <0.00%> (-1.23%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/localfilesystem.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vbG9jYWxmaWxlc3lzdGVtLnB5) | `90.90% <0.00%> (-0.76%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `80.11% <0.00%> (-0.60%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.98% <0.00%> (-0.17%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.58% <0.00%> (+0.13%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/doctests.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2RvY3Rlc3RzLnB5) | `97.60% <0.00%> (+0.39%)` | :arrow_up: |
   | [sdks/python/apache\_beam/utils/timestamp.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvdGltZXN0YW1wLnB5) | `95.93% <0.00%> (+0.50%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=footer). Last update [cee7388...8143608](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12971:
URL: https://github.com/apache/beam/pull/12971#issuecomment-701054283


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=h1) Report
   > Merging [#12971](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/cee7388365970774969e63fe90fdf3cdd804e79f?el=desc) will **increase** coverage by `0.20%`.
   > The diff coverage is `92.50%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12971/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12971      +/-   ##
   ==========================================
   + Coverage   82.34%   82.55%   +0.20%     
   ==========================================
     Files         455      453       -2     
     Lines       54650    54772     +122     
   ==========================================
   + Hits        45002    45216     +214     
   + Misses       9648     9556      -92     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `88.46% <91.66%> (-0.58%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.54% <92.64%> (-1.00%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.68% <0.00%> (-1.23%)` | :arrow_down: |
   | [...dks/python/apache\_beam/options/pipeline\_options.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vb3B0aW9ucy9waXBlbGluZV9vcHRpb25zLnB5) | `93.76% <0.00%> (-1.03%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `93.38% <0.00%> (-0.74%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `80.23% <0.00%> (-0.48%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/doctests.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2RvY3Rlc3RzLnB5) | `96.75% <0.00%> (-0.45%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `88.35% <0.00%> (-0.15%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/fileio.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZWlvLnB5) | `95.80% <0.00%> (-0.05%)` | :arrow_down: |
   | ... and [17 more](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=footer). Last update [cee7388...ab11efd](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12971:
URL: https://github.com/apache/beam/pull/12971#issuecomment-701054283


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=h1) Report
   > Merging [#12971](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/cee7388365970774969e63fe90fdf3cdd804e79f?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `92.50%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12971/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12971      +/-   ##
   ==========================================
   - Coverage   82.34%   82.34%   -0.01%     
   ==========================================
     Files         455      455              
     Lines       54650    54718      +68     
   ==========================================
   + Hits        45002    45056      +54     
   - Misses       9648     9662      +14     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `88.46% <91.66%> (-0.58%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.54% <92.64%> (-1.00%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.68% <0.00%> (-1.23%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/localfilesystem.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vbG9jYWxmaWxlc3lzdGVtLnB5) | `90.90% <0.00%> (-0.76%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `80.11% <0.00%> (-0.60%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.98% <0.00%> (-0.17%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.58% <0.00%> (+0.13%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/doctests.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2RvY3Rlc3RzLnB5) | `97.60% <0.00%> (+0.39%)` | :arrow_up: |
   | [sdks/python/apache\_beam/utils/timestamp.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvdGltZXN0YW1wLnB5) | `95.93% <0.00%> (+0.50%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=footer). Last update [cee7388...8143608](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12971:
URL: https://github.com/apache/beam/pull/12971#issuecomment-701054283


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=h1) Report
   > Merging [#12971](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/cee7388365970774969e63fe90fdf3cdd804e79f?el=desc) will **increase** coverage by `0.20%`.
   > The diff coverage is `92.50%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12971/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12971      +/-   ##
   ==========================================
   + Coverage   82.34%   82.55%   +0.20%     
   ==========================================
     Files         455      453       -2     
     Lines       54650    54772     +122     
   ==========================================
   + Hits        45002    45216     +214     
   + Misses       9648     9556      -92     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `88.46% <91.66%> (-0.58%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.54% <92.64%> (-1.00%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.68% <0.00%> (-1.23%)` | :arrow_down: |
   | [...dks/python/apache\_beam/options/pipeline\_options.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vb3B0aW9ucy9waXBlbGluZV9vcHRpb25zLnB5) | `93.76% <0.00%> (-1.03%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `93.38% <0.00%> (-0.74%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `80.23% <0.00%> (-0.48%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/doctests.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2RvY3Rlc3RzLnB5) | `96.75% <0.00%> (-0.45%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `88.35% <0.00%> (-0.15%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/fileio.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZmlsZWlvLnB5) | `95.80% <0.00%> (-0.05%)` | :arrow_down: |
   | ... and [17 more](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=footer). Last update [cee7388...ab11efd](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12971:
URL: https://github.com/apache/beam/pull/12971#discussion_r501261972



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -166,16 +173,40 @@ def expand(self, pcolls):
           partitioned_pcoll = next(pcolls.values()).pipeline | beam.Create([{}])
 
         elif self.stage.partitioning != partitionings.Nothing():
+          # Partitioning required for these operations.
+          # Compute the number of partitions to use based on estimated size.
+          if self.stage.partitioning == partitionings.Singleton():
+            # Always a single partition, don't waste time computing sizes.
+            num_partitions = 1
+          else:
+            # Estimate the sizes from the outputs of a *previous* stage such
+            # that using these estimates will not cause a fusion break.
+            input_sizes = [
+                estimate_size(input, same_stage_ok=False)
+                for input in tabular_inputs
+            ]
+            if None in input_sizes:
+              # We were unable to (cheaply) compute the size of one or more
+              # inputs.
+              num_partitions = DEFAULT_PARTITIONS
+            else:
+              num_partitions = beam.pvalue.AsSingleton(
+                  input_sizes
+                  | 'FlattenSizes' >> beam.Flatten()
+                  | 'SumSizes' >> beam.CombineGlobally(sum)

Review comment:
       This is making the assumption that this stage's outputs are equal to the sum of the sizes of the inputs right? Could you document that here?

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.

Review comment:
       Did you do some benchmarks that motivated this?

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):

Review comment:
       Should this actually be `TARGET_PARTITION_SIZE*num_partitions` (I believe that's possible), since it's the data limit across every partition?

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):
+    self._target_size = target_size
+
+  def start_bundle(self):
+    self._parts = collections.defaultdict(lambda: collections.defaultdict(list))
+    self._running_size = 0
+
+  def process(
+      self,
+      element,
+      window=beam.DoFn.WindowParam,
+      timestamp=beam.DoFn.TimestampParam):
+    _, tagged_parts = element
+    for tag, parts in tagged_parts.items():
+      for part in parts:
+        self._running_size += _total_memory_usage(part)
+      self._parts[window, timestamp][tag].extend(parts)
+    if self._running_size >= self._target_size:
+      self.finish_bundle()
+
+  def finish_bundle(self):
+    for (window, timestamp), tagged_parts in self._parts.items():
+      yield windowed_value.WindowedValue(
+          {tag: pd.concat(parts)

Review comment:
       I'm curious if it's always beneficial to do use `pd.concat` here. I was under the impression that it copies and re-arranges buffers into columns




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on pull request #12971:
URL: https://github.com/apache/beam/pull/12971#issuecomment-703008685


   Sorry @robertwb I wasn't able to get to this or your corr/cov PR today and I'll be OOO Monday and Tuesday. I'd like to take a look when I'm back on Wednesday though if you don't mind waiting.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #12971:
URL: https://github.com/apache/beam/pull/12971#issuecomment-701048994


   R: @TheNeuralBit 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #12971:
URL: https://github.com/apache/beam/pull/12971#discussion_r502090932



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -166,16 +173,40 @@ def expand(self, pcolls):
           partitioned_pcoll = next(pcolls.values()).pipeline | beam.Create([{}])
 
         elif self.stage.partitioning != partitionings.Nothing():
+          # Partitioning required for these operations.
+          # Compute the number of partitions to use based on estimated size.
+          if self.stage.partitioning == partitionings.Singleton():
+            # Always a single partition, don't waste time computing sizes.
+            num_partitions = 1
+          else:
+            # Estimate the sizes from the outputs of a *previous* stage such
+            # that using these estimates will not cause a fusion break.
+            input_sizes = [
+                estimate_size(input, same_stage_ok=False)
+                for input in tabular_inputs
+            ]
+            if None in input_sizes:
+              # We were unable to (cheaply) compute the size of one or more
+              # inputs.
+              num_partitions = DEFAULT_PARTITIONS
+            else:
+              num_partitions = beam.pvalue.AsSingleton(
+                  input_sizes
+                  | 'FlattenSizes' >> beam.Flatten()
+                  | 'SumSizes' >> beam.CombineGlobally(sum)

Review comment:
       Not quite. We want the size of the inputs to partition the inputs; we don't care about the output size of this stage at all. Clarified in the comment above. 
   
   Note also that these "stages" are not the same as fused executable "stages." In particular, these "stages" contain a (Co)GBK along with some operations that proceed it. 

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):

Review comment:
       Having thought about it, I'm going to leave it for now. The reason to have a bound is to bound the total amount of memory on a worker (and amount of compute to couple in downstream operations), and that makes sense to cap across inputs rather than per-input. We could revisit in the future. 

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.

Review comment:
       Mostly the observation that there were a plethora of tiny/empty dataframes when debugging, and the realization that this could be worse given the dynamic partitioning choices (which err on the side of overestimation). 
   
   I also ran some simple benchmarks and determined that, for simple operations, things started to become linear in around the MB range. 

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):
+    self._target_size = target_size
+
+  def start_bundle(self):
+    self._parts = collections.defaultdict(lambda: collections.defaultdict(list))
+    self._running_size = 0
+
+  def process(
+      self,
+      element,
+      window=beam.DoFn.WindowParam,
+      timestamp=beam.DoFn.TimestampParam):
+    _, tagged_parts = element
+    for tag, parts in tagged_parts.items():
+      for part in parts:
+        self._running_size += _total_memory_usage(part)
+      self._parts[window, timestamp][tag].extend(parts)
+    if self._running_size >= self._target_size:
+      self.finish_bundle()
+
+  def finish_bundle(self):
+    for (window, timestamp), tagged_parts in self._parts.items():
+      yield windowed_value.WindowedValue(
+          {tag: pd.concat(parts)

Review comment:
       How beneficial it is depends on the size of the inputs. For tiny inputs, the cost of copying is easily absorbed in the overhead savings of future operations, but for large inputs it could dominate. It may make sense to try to tune TARGET_PARTITION_SIZE to find the right tradeoff. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12971:
URL: https://github.com/apache/beam/pull/12971#discussion_r501970588



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):

Review comment:
       Discussed this offline. I was misunderstanding the code, I thought this was summing the memory usage across _partition keys_, but `for tag, parts` is actually aggregating across the tagged inputs. The partition keys are ignored (the `_` on line 424), because they're no longer needed. At this point they've served their purpose of distributing data across workers, so its fine to merge across them.
   
   It may make sense to set the target size to `TARGET_PARTITION_SIZE*num_inputs`, but I'm fine without it.

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):
+    self._target_size = target_size
+
+  def start_bundle(self):
+    self._parts = collections.defaultdict(lambda: collections.defaultdict(list))
+    self._running_size = 0
+
+  def process(
+      self,
+      element,
+      window=beam.DoFn.WindowParam,
+      timestamp=beam.DoFn.TimestampParam):
+    _, tagged_parts = element
+    for tag, parts in tagged_parts.items():
+      for part in parts:
+        self._running_size += _total_memory_usage(part)
+      self._parts[window, timestamp][tag].extend(parts)
+    if self._running_size >= self._target_size:
+      self.finish_bundle()
+
+  def finish_bundle(self):
+    for (window, timestamp), tagged_parts in self._parts.items():
+      yield windowed_value.WindowedValue(
+          {tag: pd.concat(parts)

Review comment:
       I'm curious if it's always beneficial to use `pd.concat` here. I was under the impression that it copies and re-arranges buffers into columns




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12971:
URL: https://github.com/apache/beam/pull/12971#issuecomment-701054283


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=h1) Report
   > Merging [#12971](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/cee7388365970774969e63fe90fdf3cdd804e79f?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `92.50%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12971/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12971      +/-   ##
   ==========================================
   - Coverage   82.34%   82.34%   -0.01%     
   ==========================================
     Files         455      455              
     Lines       54650    54718      +68     
   ==========================================
   + Hits        45002    45056      +54     
   - Misses       9648     9662      +14     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `88.46% <91.66%> (-0.58%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.54% <92.64%> (-1.00%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.68% <0.00%> (-1.23%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/localfilesystem.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vbG9jYWxmaWxlc3lzdGVtLnB5) | `90.90% <0.00%> (-0.76%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `80.11% <0.00%> (-0.60%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.98% <0.00%> (-0.17%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.58% <0.00%> (+0.13%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/doctests.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2RvY3Rlc3RzLnB5) | `97.60% <0.00%> (+0.39%)` | :arrow_up: |
   | [sdks/python/apache\_beam/utils/timestamp.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvdGltZXN0YW1wLnB5) | `95.93% <0.00%> (+0.50%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=footer). Last update [cee7388...ab11efd](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #12971:
URL: https://github.com/apache/beam/pull/12971#discussion_r502090932



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -166,16 +173,40 @@ def expand(self, pcolls):
           partitioned_pcoll = next(pcolls.values()).pipeline | beam.Create([{}])
 
         elif self.stage.partitioning != partitionings.Nothing():
+          # Partitioning required for these operations.
+          # Compute the number of partitions to use based on estimated size.
+          if self.stage.partitioning == partitionings.Singleton():
+            # Always a single partition, don't waste time computing sizes.
+            num_partitions = 1
+          else:
+            # Estimate the sizes from the outputs of a *previous* stage such
+            # that using these estimates will not cause a fusion break.
+            input_sizes = [
+                estimate_size(input, same_stage_ok=False)
+                for input in tabular_inputs
+            ]
+            if None in input_sizes:
+              # We were unable to (cheaply) compute the size of one or more
+              # inputs.
+              num_partitions = DEFAULT_PARTITIONS
+            else:
+              num_partitions = beam.pvalue.AsSingleton(
+                  input_sizes
+                  | 'FlattenSizes' >> beam.Flatten()
+                  | 'SumSizes' >> beam.CombineGlobally(sum)

Review comment:
       Not quite. We want the size of the inputs to partition the inputs; we don't care about the output size of this stage at all. Clarified in the comment above. 
   
   Note also that these "stages" are not the same as fused executable "stages." In particular, these "stages" contain a (Co)GBK along with some operations that proceed it. 

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):

Review comment:
       Having thought about it, I'm going to leave it for now. The reason to have a bound is to bound the total amount of memory on a worker (and amount of compute to couple in downstream operations), and that makes sense to cap across inputs rather than per-input. We could revisit in the future. 

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.

Review comment:
       Mostly the observation that there were a plethora of tiny/empty dataframes when debugging, and the realization that this could be worse given the dynamic partitioning choices (which err on the side of overestimation). 
   
   I also ran some simple benchmarks and determined that, for simple operations, things started to become linear in around the MB range. 

##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):
+    self._target_size = target_size
+
+  def start_bundle(self):
+    self._parts = collections.defaultdict(lambda: collections.defaultdict(list))
+    self._running_size = 0
+
+  def process(
+      self,
+      element,
+      window=beam.DoFn.WindowParam,
+      timestamp=beam.DoFn.TimestampParam):
+    _, tagged_parts = element
+    for tag, parts in tagged_parts.items():
+      for part in parts:
+        self._running_size += _total_memory_usage(part)
+      self._parts[window, timestamp][tag].extend(parts)
+    if self._running_size >= self._target_size:
+      self.finish_bundle()
+
+  def finish_bundle(self):
+    for (window, timestamp), tagged_parts in self._parts.items():
+      yield windowed_value.WindowedValue(
+          {tag: pd.concat(parts)

Review comment:
       How beneficial it is depends on the size of the inputs. For tiny inputs, the cost of copying is easily absorbed in the overhead savings of future operations, but for large inputs it could dominate. It may make sense to try to tune TARGET_PARTITION_SIZE to find the right tradeoff. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb merged pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
robertwb merged pull request #12971:
URL: https://github.com/apache/beam/pull/12971


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] commented on pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #12971:
URL: https://github.com/apache/beam/pull/12971#issuecomment-701054283


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=h1) Report
   > Merging [#12971](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/cee7388365970774969e63fe90fdf3cdd804e79f?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `92.50%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12971/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12971      +/-   ##
   ==========================================
   - Coverage   82.34%   82.34%   -0.01%     
   ==========================================
     Files         455      455              
     Lines       54650    54718      +68     
   ==========================================
   + Hits        45002    45056      +54     
   - Misses       9648     9662      +14     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `88.46% <91.66%> (-0.58%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.54% <92.64%> (-1.00%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.68% <0.00%> (-1.23%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/localfilesystem.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vbG9jYWxmaWxlc3lzdGVtLnB5) | `90.90% <0.00%> (-0.76%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `80.11% <0.00%> (-0.60%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.98% <0.00%> (-0.17%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.58% <0.00%> (+0.13%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/doctests.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2RvY3Rlc3RzLnB5) | `97.60% <0.00%> (+0.39%)` | :arrow_up: |
   | [sdks/python/apache\_beam/utils/timestamp.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvdGltZXN0YW1wLnB5) | `95.93% <0.00%> (+0.50%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=footer). Last update [cee7388...8143608](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12971:
URL: https://github.com/apache/beam/pull/12971#discussion_r501273437



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -321,17 +352,100 @@ def expr_to_pcoll(expr):
       else:
         return stage_to_result(expr_to_stage(expr))[expr._id]
 
+    @memoize
+    def estimate_size(expr, same_stage_ok):
+      # Returns a pcollection of ints whose sum is the estimated size of the
+      # given expression.
+      pipeline = next(iter(inputs.values())).pipeline
+      label = 'Size[%s, %s]' % (expr._id, same_stage_ok)
+      if is_scalar(expr):
+        return pipeline | label >> beam.Create([0])
+      elif same_stage_ok:
+        return expr_to_pcoll(expr) | label >> beam.Map(_total_memory_usage)
+      elif expr in inputs:
+        return None
+      else:
+        # This is the stage to avoid.
+        expr_stage = expr_to_stage(expr)
+        # If the stage doesn't start with a shuffle, it's not safe to fuse
+        # the computation into its parent either.
+        has_shuffle = expr_stage.partitioning != partitionings.Nothing()
+        # We assume the size of an expression is the sum of the size of its
+        # inputs, which may be off by quite a bit, but the goal is to get
+        # within an order of magnitude or two.
+        arg_sizes = []
+        for arg in expr.args():
+          if is_scalar(arg):
+            continue
+          elif arg in inputs:
+            return None
+          arg_size = estimate_size(
+              arg,
+              same_stage_ok=has_shuffle and expr_to_stage(arg) != expr_stage)
+          if arg_size is None:
+            return None
+          arg_sizes.append(arg_size)
+        return arg_sizes | label >> beam.Flatten(pipeline=pipeline)
+
     # Now we can compute and return the result.
     return {k: expr_to_pcoll(expr) for k, expr in outputs.items()}
 
 
+def _total_memory_usage(frame):
+  assert isinstance(frame, (pd.core.generic.NDFrame, pd.Index))
+  try:
+    size = frame.memory_usage()
+    if not isinstance(size, int):
+      size = size.sum()
+    return size
+  except AttributeError:
+    # Don't know, assume it's really big.
+    float('inf')
+
+
+class _ReBatch(beam.DoFn):
+  """Groups all the parts from various workers into the same dataframe.
+
+  Also groups across partitions, up to a given data size, to recover some
+  efficiency in the face of over-partitioning.
+  """
+  def __init__(self, target_size=TARGET_PARTITION_SIZE):
+    self._target_size = target_size
+
+  def start_bundle(self):
+    self._parts = collections.defaultdict(lambda: collections.defaultdict(list))
+    self._running_size = 0
+
+  def process(
+      self,
+      element,
+      window=beam.DoFn.WindowParam,
+      timestamp=beam.DoFn.TimestampParam):
+    _, tagged_parts = element
+    for tag, parts in tagged_parts.items():
+      for part in parts:
+        self._running_size += _total_memory_usage(part)
+      self._parts[window, timestamp][tag].extend(parts)
+    if self._running_size >= self._target_size:
+      self.finish_bundle()
+
+  def finish_bundle(self):
+    for (window, timestamp), tagged_parts in self._parts.items():
+      yield windowed_value.WindowedValue(
+          {tag: pd.concat(parts)

Review comment:
       I'm curious if it's always beneficial to use `pd.concat` here. I was under the impression that it copies and re-arranges buffers into columns




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] TheNeuralBit commented on a change in pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
TheNeuralBit commented on a change in pull request #12971:
URL: https://github.com/apache/beam/pull/12971#discussion_r502557533



##########
File path: sdks/python/apache_beam/dataframe/transforms.py
##########
@@ -166,16 +173,40 @@ def expand(self, pcolls):
           partitioned_pcoll = next(pcolls.values()).pipeline | beam.Create([{}])
 
         elif self.stage.partitioning != partitionings.Nothing():
+          # Partitioning required for these operations.
+          # Compute the number of partitions to use based on estimated size.
+          if self.stage.partitioning == partitionings.Singleton():
+            # Always a single partition, don't waste time computing sizes.
+            num_partitions = 1
+          else:
+            # Estimate the sizes from the outputs of a *previous* stage such
+            # that using these estimates will not cause a fusion break.
+            input_sizes = [
+                estimate_size(input, same_stage_ok=False)
+                for input in tabular_inputs
+            ]
+            if None in input_sizes:
+              # We were unable to (cheaply) compute the size of one or more
+              # inputs.
+              num_partitions = DEFAULT_PARTITIONS
+            else:
+              num_partitions = beam.pvalue.AsSingleton(
+                  input_sizes
+                  | 'FlattenSizes' >> beam.Flatten()
+                  | 'SumSizes' >> beam.CombineGlobally(sum)

Review comment:
       The reason I mentioned the outputs of this stage is that it looks like the objective is for the output partition sizes to be equal to `TARGET_PARTITION_SIZE` iff the size of each output is equal to the sum of the sizes of the inputs. It looks like you do have a comment to that effect in `estimate_sizes`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #12971: [BEAM-10988] Partition dataframes according to size estimates.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #12971:
URL: https://github.com/apache/beam/pull/12971#issuecomment-701054283


   # [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=h1) Report
   > Merging [#12971](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **increase** coverage by `0.04%`.
   > The diff coverage is `92.50%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/12971/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #12971      +/-   ##
   ==========================================
   + Coverage   82.51%   82.55%   +0.04%     
   ==========================================
     Files         455      453       -2     
     Lines       54867    54772      -95     
   ==========================================
   - Hits        45272    45216      -56     
   + Misses       9595     9556      -39     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `88.46% <91.66%> (-0.58%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.54% <92.64%> (-1.00%)` | :arrow_down: |
   | [...ython/apache\_beam/io/gcp/tests/bigquery\_matcher.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL3Rlc3RzL2JpZ3F1ZXJ5X21hdGNoZXIucHk=) | `79.83% <0.00%> (-2.87%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.68% <0.00%> (-0.84%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/retry.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvcmV0cnkucHk=) | `86.27% <0.00%> (-0.77%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/direct/test\_stream\_impl.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvdGVzdF9zdHJlYW1faW1wbC5weQ==) | `93.38% <0.00%> (-0.74%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `90.77% <0.00%> (-0.53%)` | :arrow_down: |
   | [sdks/python/apache\_beam/metrics/metricbase.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWV0cmljcy9tZXRyaWNiYXNlLnB5) | `87.87% <0.00%> (-0.36%)` | :arrow_down: |
   | [...python/apache\_beam/runners/worker/worker\_status.py](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvd29ya2VyX3N0YXR1cy5weQ==) | `77.77% <0.00%> (-0.23%)` | :arrow_down: |
   | ... and [21 more](https://codecov.io/gh/apache/beam/pull/12971/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=footer). Last update [6bf56f9...ec7c344](https://codecov.io/gh/apache/beam/pull/12971?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org