You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/23 04:36:21 UTC

[GitHub] [beam] iindyk opened a new pull request #13175: Adding Cythonization and other performance improvements to Approximat…

iindyk opened a new pull request #13175:
URL: https://github.com/apache/beam/pull/13175


   …eQuantiles. Adding option to process batches of elements.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-754680115


   @aaltay could you recommend someone to take a look at this PR please?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (59b4d6a) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.52%`.
   > The diff coverage is `95.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   83.01%   +0.52%     
   ==========================================
     Files         455      469      +14     
     Lines       54876    58331    +3455     
   ==========================================
   + Hits        45266    48425    +3159     
   - Misses       9610     9906     +296     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.07% <ø> (-0.22%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `86.45% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.07% <ø> (-4.34%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [147 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...59b4d6a](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-724053075


   friendly ping @robertwb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-746799549


   @robertwb - could you take another look at this PR please?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (59b4d6a) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.52%`.
   > The diff coverage is `95.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   83.01%   +0.52%     
   ==========================================
     Files         455      469      +14     
     Lines       54876    58331    +3455     
   ==========================================
   + Hits        45266    48425    +3159     
   - Misses       9610     9906     +296     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.07% <ø> (-0.22%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `86.45% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.07% <ø> (-4.34%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [147 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...59b4d6a](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-717978836


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570716476



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -636,132 +895,33 @@ def _offset(self, new_weight):
       self._offset_jitter = 2 - self._offset_jitter
       return (new_weight + self._offset_jitter) / 2
 
-  def _collapse(self, buffers):
-    # type: (Iterable[_QuantileBuffer[T]]) -> _QuantileBuffer[T]
-    new_level = 0
-    new_weight = 0
-    for buffer_elem in buffers:
-      # As presented in the paper, there should always be at least two
-      # buffers of the same (minimal) level to collapse, but it is possible
-      # to violate this condition when combining buffers from independently
-      # computed shards.  If they differ we take the max.
-      new_level = max([new_level, buffer_elem.level + 1])
-      new_weight = new_weight + buffer_elem.weight
-    if self._weighted:
-      step = new_weight / (self._buffer_size - 1)
-      offset = new_weight / (2 * self._buffer_size)
-    else:
-      step = new_weight
-      offset = self._offset(new_weight)
-    new_elements = self._interpolate(buffers, self._buffer_size, step, offset)
-    return _QuantileBuffer(new_elements, self._weighted, new_level, new_weight)
-
-  def _collapse_if_needed(self, qs):
-    # type: (_QuantileState) -> None
-    while len(qs.buffers) > self._num_buffers:
-      to_collapse = []
-      to_collapse.append(heapq.heappop(qs.buffers))
-      to_collapse.append(heapq.heappop(qs.buffers))
-      min_level = to_collapse[1].level
-
-      while len(qs.buffers) > 0 and qs.buffers[0].level == min_level:
-        to_collapse.append(heapq.heappop(qs.buffers))
-
-      heapq.heappush(qs.buffers, self._collapse(to_collapse))
-
-  def _interpolate(self, i_buffers, count, step, offset):
-    """
-    Emulates taking the ordered union of all elements in buffers, repeated
-    according to their weight, and picking out the (k * step + offset)-th
-    elements of this list for `0 <= k < count`.
-    """
-
-    iterators = []
-    new_elements = []
-    compare_key = self._key
-    if self._key and not self._weighted:
-      compare_key = lambda x: self._key(x[0])
-    for buffer_elem in i_buffers:
-      iterators.append(buffer_elem.sized_iterator())
-
-    # Python 3 `heapq.merge` support key comparison and returns an iterator and
-    # does not pull the data into memory all at once. Python 2 does not
-    # support comparison on its `heapq.merge` api, so we use the itertools
-    # which takes the `key` function for comparison and creates an iterator
-    # from it.
-    if sys.version_info[0] < 3:
-      sorted_elem = iter(
-          sorted(
-              itertools.chain.from_iterable(iterators),
-              key=compare_key,
-              reverse=self._reverse))
-    else:
-      sorted_elem = heapq.merge(
-          *iterators, key=compare_key, reverse=self._reverse)
-
-    weighted_element = next(sorted_elem)
-    current = weighted_element[1]
-    j = 0
-    previous = 0
-    while j < count:
-      target = j * step + offset
-      j = j + 1
-      try:
-        while current <= target:
-          weighted_element = next(sorted_elem)
-          current = current + weighted_element[1]
-      except StopIteration:
-        pass
-      if self._weighted:
-        new_elements.append((weighted_element[0], current - previous))
-        previous = current
-      else:
-        new_elements.append(weighted_element[0])
-    return new_elements
-
   # TODO(BEAM-7746): Signature incompatible with supertype
   def create_accumulator(self):  # type: ignore[override]
-    # type: () -> _QuantileState[T]
+    # type: () -> _QuantileState
     self._qs = _QuantileState(
-        buffer_size=self._buffer_size,
-        num_buffers=self._num_buffers,
         unbuffered_elements=[],
-        buffers=[])
+        unbuffered_weights=[],
+        buffers=[],
+        spec=self._spec)
     return self._qs
 
   def add_input(self, quantile_state, element):
     """
     Add a new element to the collection being summarized by quantile state.
     """
-    value = element[0] if self._weighted else element
-    if quantile_state.is_empty():
-      quantile_state.min_val = quantile_state.max_val = value
-    elif self._comparator(value, quantile_state.min_val) < 0:
-      quantile_state.min_val = value
-    elif self._comparator(value, quantile_state.max_val) > 0:
-      quantile_state.max_val = value
-    self._add_unbuffered(quantile_state, elements=[element])
+    quantile_state.add_unbuffered([element], self._offset)

Review comment:
       I don't think that this may cause any problems with cythonization or performance. They will be static methods though, so the only difference is the namespace and neither of them deals with _QuantileState objects. But I don't have any strong preference, WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-760293944


   The algorithm remains as is described in the paper. These changes have more to do with the implementation rather than the algorithm itself.
   The core change here is the Cythonization which has to come in one piece (other optimizations are  few line changes and are interconnected). Cythonization requires moving many things outside of ApproximateQuantilesCombineFn (not an extension class and doesn't allow cythonization) which make a bulk of this change. So my feeling is that splitting them will not make it easier to review.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768593897


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (8fff438) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.30%`.
   > The diff coverage is `81.81%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   82.78%   +0.30%     
   ==========================================
     Files         455      466      +11     
     Lines       54876    57589    +2713     
   ==========================================
   + Hits        45266    47677    +2411     
   - Misses       9610     9912     +302     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.97% <ø> (+0.67%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `86.45% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.11% <ø> (-4.30%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [137 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...06806b0](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r578893392



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -523,29 +805,25 @@ def __init__(
       num_buffers,  # type: int
       key=None,
       reverse=False,
-      weighted=False):
-    def _comparator(a, b):
-      if key:
-        a, b = key(a), key(b)
-
-      retval = int(a > b) - int(a < b)
-
-      if reverse:
-        return -retval
-
-      return retval
-
-    self._comparator = _comparator
-
+      weighted=False,
+      batch_input=False):
     self._num_quantiles = num_quantiles
-    self._buffer_size = buffer_size
-    self._num_buffers = num_buffers
-    if weighted:
-      self._key = (lambda x: x[0]) if key is None else (lambda x: key(x[0]))
-    else:
-      self._key = key
-    self._reverse = reverse
-    self._weighted = weighted
+    self._spec = _QuantileSpec(buffer_size, num_buffers, weighted, key, reverse)
+    self._batch_input = batch_input
+    if self._batch_input:
+      setattr(self, 'add_input', self._add_inputs)

Review comment:
       Thanks. Looks like this is https://github.com/python/mypy/issues/2427.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570720343



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -61,30 +58,34 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-    import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+    # mmh3.hash64 returns two 64-bit unsigned integers
+    return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
 
-    def _mmh3_hash(value):
-      # mmh3.hash64 returns two 64-bit unsigned integers
-      return mmh3.hash64(value, seed=0, signed=False)[0]
+  def _md5_hash(value):
+    # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+    # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+    # integer fingerprint.
+    return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-    return _mmh3_hash
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-  except ImportError:
+
+def _get_default_hash_fn():
+  """Returns either murmurhash or md5 based on installation."""
+  if _default_hash_fn_type == 'md5':
     logging.warning(
         'Couldn\'t find murmurhash. Install mmh3 for a faster implementation of'

Review comment:
       Should I make it a dependency then?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570716679



##########
File path: sdks/python/apache_beam/transforms/stats_test.py
##########
@@ -482,13 +482,74 @@ def test_alternate_quantiles(self):
           equal_to([["ccccc", "aaa", "b"]]),
           label='checkWithKeyAndReversed')
 
+  def test_batched_quantiles(self):

Review comment:
       1. I think the tests use DirectRunner, so probably no.
   2. The approximation will be properly tested only if either the number of inputs will be large with default settings, or max_num_elements and epsilon will be set to extremely low and large values, respectively. I tested approximation with large number of inputs and FlumeCppRunner during development, but it took some time to complete, so it's probably not suitable for continuous testing. It might make sense for me to initialize the CombineFn with the extreme values and test add_input, merge_accumulators and extract_output directly, WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r521438895



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:
+      self.less_than = lambda a, b: a < b
+    elif key is None:
+      self.less_than = lambda a, b: a > b
+    elif not reverse:
+      self.less_than = lambda a, b: key(a) < key(b)
+    else:
+      self.less_than = lambda a, b: key(a) > key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):

Review comment:
       when inherit _QuantileBuffer(object, Generic[T]), I get 
   ```
   TypeError: Cannot create a consistent method resolution order (MRO) for bases object, Generic
   ```
   without Cython, when I do _QuantileBuffer(Generic[T], object), then it works for Python, but with Cythonization I get 
   ```
   First base of '_QuantileBuffer' is not an extension type.
   ```
   and
   ```
   Only one extension type base class allowed.
   ```
   Am I missing something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r574970079



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -636,132 +895,33 @@ def _offset(self, new_weight):
       self._offset_jitter = 2 - self._offset_jitter
       return (new_weight + self._offset_jitter) / 2
 
-  def _collapse(self, buffers):
-    # type: (Iterable[_QuantileBuffer[T]]) -> _QuantileBuffer[T]
-    new_level = 0
-    new_weight = 0
-    for buffer_elem in buffers:
-      # As presented in the paper, there should always be at least two
-      # buffers of the same (minimal) level to collapse, but it is possible
-      # to violate this condition when combining buffers from independently
-      # computed shards.  If they differ we take the max.
-      new_level = max([new_level, buffer_elem.level + 1])
-      new_weight = new_weight + buffer_elem.weight
-    if self._weighted:
-      step = new_weight / (self._buffer_size - 1)
-      offset = new_weight / (2 * self._buffer_size)
-    else:
-      step = new_weight
-      offset = self._offset(new_weight)
-    new_elements = self._interpolate(buffers, self._buffer_size, step, offset)
-    return _QuantileBuffer(new_elements, self._weighted, new_level, new_weight)
-
-  def _collapse_if_needed(self, qs):
-    # type: (_QuantileState) -> None
-    while len(qs.buffers) > self._num_buffers:
-      to_collapse = []
-      to_collapse.append(heapq.heappop(qs.buffers))
-      to_collapse.append(heapq.heappop(qs.buffers))
-      min_level = to_collapse[1].level
-
-      while len(qs.buffers) > 0 and qs.buffers[0].level == min_level:
-        to_collapse.append(heapq.heappop(qs.buffers))
-
-      heapq.heappush(qs.buffers, self._collapse(to_collapse))
-
-  def _interpolate(self, i_buffers, count, step, offset):
-    """
-    Emulates taking the ordered union of all elements in buffers, repeated
-    according to their weight, and picking out the (k * step + offset)-th
-    elements of this list for `0 <= k < count`.
-    """
-
-    iterators = []
-    new_elements = []
-    compare_key = self._key
-    if self._key and not self._weighted:
-      compare_key = lambda x: self._key(x[0])
-    for buffer_elem in i_buffers:
-      iterators.append(buffer_elem.sized_iterator())
-
-    # Python 3 `heapq.merge` support key comparison and returns an iterator and
-    # does not pull the data into memory all at once. Python 2 does not
-    # support comparison on its `heapq.merge` api, so we use the itertools
-    # which takes the `key` function for comparison and creates an iterator
-    # from it.
-    if sys.version_info[0] < 3:
-      sorted_elem = iter(
-          sorted(
-              itertools.chain.from_iterable(iterators),
-              key=compare_key,
-              reverse=self._reverse))
-    else:
-      sorted_elem = heapq.merge(
-          *iterators, key=compare_key, reverse=self._reverse)
-
-    weighted_element = next(sorted_elem)
-    current = weighted_element[1]
-    j = 0
-    previous = 0
-    while j < count:
-      target = j * step + offset
-      j = j + 1
-      try:
-        while current <= target:
-          weighted_element = next(sorted_elem)
-          current = current + weighted_element[1]
-      except StopIteration:
-        pass
-      if self._weighted:
-        new_elements.append((weighted_element[0], current - previous))
-        previous = current
-      else:
-        new_elements.append(weighted_element[0])
-    return new_elements
-
   # TODO(BEAM-7746): Signature incompatible with supertype
   def create_accumulator(self):  # type: ignore[override]
-    # type: () -> _QuantileState[T]
+    # type: () -> _QuantileState
     self._qs = _QuantileState(
-        buffer_size=self._buffer_size,
-        num_buffers=self._num_buffers,
         unbuffered_elements=[],
-        buffers=[])
+        unbuffered_weights=[],
+        buffers=[],
+        spec=self._spec)
     return self._qs
 
   def add_input(self, quantile_state, element):
     """
     Add a new element to the collection being summarized by quantile state.
     """
-    value = element[0] if self._weighted else element
-    if quantile_state.is_empty():
-      quantile_state.min_val = quantile_state.max_val = value
-    elif self._comparator(value, quantile_state.min_val) < 0:
-      quantile_state.min_val = value
-    elif self._comparator(value, quantile_state.max_val) > 0:
-      quantile_state.max_val = value
-    self._add_unbuffered(quantile_state, elements=[element])
+    quantile_state.add_unbuffered([element], self._offset)

Review comment:
       sg




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r514364188



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))

Review comment:
       It's not, unfortunately. It's about 30% slower in `add_input` (for batched inputs) and slightly more than that in `merge_accumulators`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (8fff438) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.30%`.
   > The diff coverage is `81.81%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   82.78%   +0.30%     
   ==========================================
     Files         455      466      +11     
     Lines       54876    57589    +2713     
   ==========================================
   + Hits        45266    47677    +2411     
   - Misses       9610     9912     +302     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.97% <ø> (+0.67%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `86.45% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.11% <ø> (-4.30%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [137 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...8fff438](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r514366171



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:
+      self.less_than = lambda a, b: a < b
+    elif key is None:
+      self.less_than = lambda a, b: a > b
+    elif not reverse:
+      self.less_than = lambda a, b: key(a) < key(b)
+    else:
+      self.less_than = lambda a, b: key(a) > key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):
   """A single buffer in the sense of the referenced algorithm.
   (see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
   &type=pdf and ApproximateQuantilesCombineFn for further information)"""
-  def __init__(self, elements, weighted, level=0, weight=1):
-    # type: (Sequence[T], bool, int, int) -> None
-    # In case of weighted quantiles, elements are tuples of values and weights.
+  def __init__(
+      self, elements, weights, weighted, level=0, min_val=None, max_val=None):
+    # type: (List, List, bool, int, Any, Any) -> None
     self.elements = elements
-    self.weighted = weighted
+    self.weights = weights
     self.level = level
-    self.weight = weight
-
-  def __lt__(self, other):
-    if self.weighted:
-      return [element[0] for element in self.elements
-              ] < [element[0] for element in other.elements]
+    if min_val is None or max_val is None:
+      # Buffer is always initialized with sorted elements.
+      self.min_val = elements[0]
+      self.max_val = elements[-1]
     else:
-      return self.elements < other.elements
-
-  def sized_iterator(self):
-    class QuantileBufferIterator(object):
-      def __init__(self, elem, weighted, weight):
-        self._iter = iter(elem)
-        self.weighted = weighted
-        self.weight = weight
-
-      def __iter__(self):
-        return self
+      # Note that collapsed buffer may not contain min and max in the list of
+      # elements.
+      self.min_val = min_val
+      self.max_val = max_val
+    self._iter = zip(
+        self.elements,
+        self.weights if weighted else itertools.repeat(self.weights[0]))
 
-      def __next__(self):
-        if self.weighted:
-          return next(self._iter)
-        else:
-          value = next(self._iter)
-          return (value, self.weight)
+  def __iter__(self):
+    return self._iter

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:
+      self.less_than = lambda a, b: a < b
+    elif key is None:
+      self.less_than = lambda a, b: a > b
+    elif not reverse:
+      self.less_than = lambda a, b: key(a) < key(b)
+    else:
+      self.less_than = lambda a, b: key(a) > key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):
   """A single buffer in the sense of the referenced algorithm.
   (see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
   &type=pdf and ApproximateQuantilesCombineFn for further information)"""
-  def __init__(self, elements, weighted, level=0, weight=1):
-    # type: (Sequence[T], bool, int, int) -> None
-    # In case of weighted quantiles, elements are tuples of values and weights.
+  def __init__(
+      self, elements, weights, weighted, level=0, min_val=None, max_val=None):
+    # type: (List, List, bool, int, Any, Any) -> None
     self.elements = elements
-    self.weighted = weighted
+    self.weights = weights
     self.level = level
-    self.weight = weight
-
-  def __lt__(self, other):
-    if self.weighted:
-      return [element[0] for element in self.elements
-              ] < [element[0] for element in other.elements]
+    if min_val is None or max_val is None:
+      # Buffer is always initialized with sorted elements.
+      self.min_val = elements[0]
+      self.max_val = elements[-1]
     else:
-      return self.elements < other.elements
-
-  def sized_iterator(self):
-    class QuantileBufferIterator(object):
-      def __init__(self, elem, weighted, weight):
-        self._iter = iter(elem)
-        self.weighted = weighted
-        self.weight = weight
-
-      def __iter__(self):
-        return self
+      # Note that collapsed buffer may not contain min and max in the list of
+      # elements.
+      self.min_val = min_val
+      self.max_val = max_val
+    self._iter = zip(
+        self.elements,
+        self.weights if weighted else itertools.repeat(self.weights[0]))
 
-      def __next__(self):
-        if self.weighted:
-          return next(self._iter)
-        else:
-          value = next(self._iter)
-          return (value, self.weight)
+  def __iter__(self):
+    return self._iter
 
-      next = __next__  # For Python 2
+  def __next__(self):

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding Cythonization and other performance improvements to Approximat…

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-717296554


   fn_runner_test seems to be failing, but I think it's unrelated to this PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-754773796


   R: @tvalentyn - Would you have some time in the next few weeks to review this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r513689796



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:

Review comment:
       Nit: it'd be easier to read `if reverse and key is None` rather than having the extra negation in there. 

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -61,30 +58,34 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-    import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+    # mmh3.hash64 returns two 64-bit unsigned integers
+    return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
 
-    def _mmh3_hash(value):
-      # mmh3.hash64 returns two 64-bit unsigned integers
-      return mmh3.hash64(value, seed=0, signed=False)[0]
+  def _md5_hash(value):
+    # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+    # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+    # integer fingerprint.
+    return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-    return _mmh3_hash
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-  except ImportError:
+
+def _get_default_hash_fn():
+  """Returns either murmurhash or md5 based on installation."""
+  if _default_hash_fn_type == 'md5':
     logging.warning(
         'Couldn\'t find murmurhash. Install mmh3 for a faster implementation of'

Review comment:
       Are there downsides to just making this a dependency? 

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))

Review comment:
       I'm curious if it's faster to always have weights (by default 1) than introducing this indirection everywhere. 

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:
+      self.less_than = lambda a, b: a < b
+    elif key is None:
+      self.less_than = lambda a, b: a > b
+    elif not reverse:
+      self.less_than = lambda a, b: key(a) < key(b)
+    else:
+      self.less_than = lambda a, b: key(a) > key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):
   """A single buffer in the sense of the referenced algorithm.
   (see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
   &type=pdf and ApproximateQuantilesCombineFn for further information)"""
-  def __init__(self, elements, weighted, level=0, weight=1):
-    # type: (Sequence[T], bool, int, int) -> None
-    # In case of weighted quantiles, elements are tuples of values and weights.
+  def __init__(
+      self, elements, weights, weighted, level=0, min_val=None, max_val=None):
+    # type: (List, List, bool, int, Any, Any) -> None
     self.elements = elements
-    self.weighted = weighted
+    self.weights = weights
     self.level = level
-    self.weight = weight
-
-  def __lt__(self, other):
-    if self.weighted:
-      return [element[0] for element in self.elements
-              ] < [element[0] for element in other.elements]
+    if min_val is None or max_val is None:
+      # Buffer is always initialized with sorted elements.
+      self.min_val = elements[0]
+      self.max_val = elements[-1]
     else:
-      return self.elements < other.elements
-
-  def sized_iterator(self):
-    class QuantileBufferIterator(object):
-      def __init__(self, elem, weighted, weight):
-        self._iter = iter(elem)
-        self.weighted = weighted
-        self.weight = weight
-
-      def __iter__(self):
-        return self
+      # Note that collapsed buffer may not contain min and max in the list of
+      # elements.
+      self.min_val = min_val
+      self.max_val = max_val
+    self._iter = zip(
+        self.elements,
+        self.weights if weighted else itertools.repeat(self.weights[0]))
 
-      def __next__(self):
-        if self.weighted:
-          return next(self._iter)
-        else:
-          value = next(self._iter)
-          return (value, self.weight)
+  def __iter__(self):
+    return self._iter
 
-      next = __next__  # For Python 2
+  def __next__(self):

Review comment:
       Python 2 support no longer needed. 

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:
+      self.less_than = lambda a, b: a < b
+    elif key is None:
+      self.less_than = lambda a, b: a > b
+    elif not reverse:
+      self.less_than = lambda a, b: key(a) < key(b)
+    else:
+      self.less_than = lambda a, b: key(a) > key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):
   """A single buffer in the sense of the referenced algorithm.
   (see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
   &type=pdf and ApproximateQuantilesCombineFn for further information)"""
-  def __init__(self, elements, weighted, level=0, weight=1):
-    # type: (Sequence[T], bool, int, int) -> None
-    # In case of weighted quantiles, elements are tuples of values and weights.
+  def __init__(
+      self, elements, weights, weighted, level=0, min_val=None, max_val=None):
+    # type: (List, List, bool, int, Any, Any) -> None
     self.elements = elements
-    self.weighted = weighted
+    self.weights = weights
     self.level = level
-    self.weight = weight
-
-  def __lt__(self, other):
-    if self.weighted:
-      return [element[0] for element in self.elements
-              ] < [element[0] for element in other.elements]
+    if min_val is None or max_val is None:
+      # Buffer is always initialized with sorted elements.
+      self.min_val = elements[0]
+      self.max_val = elements[-1]
     else:
-      return self.elements < other.elements
-
-  def sized_iterator(self):
-    class QuantileBufferIterator(object):
-      def __init__(self, elem, weighted, weight):
-        self._iter = iter(elem)
-        self.weighted = weighted
-        self.weight = weight
-
-      def __iter__(self):
-        return self
+      # Note that collapsed buffer may not contain min and max in the list of
+      # elements.
+      self.min_val = min_val
+      self.max_val = max_val
+    self._iter = zip(
+        self.elements,
+        self.weights if weighted else itertools.repeat(self.weights[0]))
 
-      def __next__(self):
-        if self.weighted:
-          return next(self._iter)
-        else:
-          value = next(self._iter)
-          return (value, self.weight)
+  def __iter__(self):
+    return self._iter

Review comment:
       This will break if it's called twice. Instead put the call to zip here. 

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:
+      self.less_than = lambda a, b: a < b
+    elif key is None:
+      self.less_than = lambda a, b: a > b
+    elif not reverse:
+      self.less_than = lambda a, b: key(a) < key(b)
+    else:
+      self.less_than = lambda a, b: key(a) > key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):
   """A single buffer in the sense of the referenced algorithm.
   (see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
   &type=pdf and ApproximateQuantilesCombineFn for further information)"""
-  def __init__(self, elements, weighted, level=0, weight=1):
-    # type: (Sequence[T], bool, int, int) -> None
-    # In case of weighted quantiles, elements are tuples of values and weights.
+  def __init__(
+      self, elements, weights, weighted, level=0, min_val=None, max_val=None):
+    # type: (List, List, bool, int, Any, Any) -> None
     self.elements = elements
-    self.weighted = weighted
+    self.weights = weights
     self.level = level
-    self.weight = weight
-
-  def __lt__(self, other):
-    if self.weighted:
-      return [element[0] for element in self.elements
-              ] < [element[0] for element in other.elements]
+    if min_val is None or max_val is None:
+      # Buffer is always initialized with sorted elements.
+      self.min_val = elements[0]
+      self.max_val = elements[-1]
     else:
-      return self.elements < other.elements
-
-  def sized_iterator(self):
-    class QuantileBufferIterator(object):
-      def __init__(self, elem, weighted, weight):
-        self._iter = iter(elem)
-        self.weighted = weighted
-        self.weight = weight
-
-      def __iter__(self):
-        return self
+      # Note that collapsed buffer may not contain min and max in the list of
+      # elements.
+      self.min_val = min_val
+      self.max_val = max_val
+    self._iter = zip(
+        self.elements,
+        self.weights if weighted else itertools.repeat(self.weights[0]))
 
-      def __next__(self):
-        if self.weighted:
-          return next(self._iter)
-        else:
-          value = next(self._iter)
-          return (value, self.weight)
+  def __iter__(self):
+    return self._iter
 
-      next = __next__  # For Python 2
+  def __next__(self):
+    return next(self._iter)
 
-    return QuantileBufferIterator(self.elements, self.weighted, self.weight)
+  def __lt__(self, other):
+    return self.level < other.level
 
 
-class _QuantileState(Generic[T]):
+class _QuantileState(object):

Review comment:
       Same.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:
+      self.less_than = lambda a, b: a < b
+    elif key is None:
+      self.less_than = lambda a, b: a > b
+    elif not reverse:
+      self.less_than = lambda a, b: key(a) < key(b)
+    else:
+      self.less_than = lambda a, b: key(a) > key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):

Review comment:
       Can we doubly inherit to keep the type checking? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r574969859



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -61,30 +58,34 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-    import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+    # mmh3.hash64 returns two 64-bit unsigned integers
+    return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
 
-    def _mmh3_hash(value):
-      # mmh3.hash64 returns two 64-bit unsigned integers
-      return mmh3.hash64(value, seed=0, signed=False)[0]
+  def _md5_hash(value):
+    # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+    # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+    # integer fingerprint.
+    return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-    return _mmh3_hash
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-  except ImportError:
+
+def _get_default_hash_fn():
+  """Returns either murmurhash or md5 based on installation."""
+  if _default_hash_fn_type == 'md5':
     logging.warning(
         'Couldn\'t find murmurhash. Install mmh3 for a faster implementation of'

Review comment:
       sg




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r563427603



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -327,27 +330,39 @@ class Globally(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate

Review comment:
       1. Can you please comment on the structure of a 'batch' here? in particular for the weighted case. Consider also adding an example to line 295.
   
   2. Looking at the tests, it seems that for weighted case with batches, we expect users to provide  elements and weights as separate lists. From API/usability standpoint, what is the rationale on providing weights as a separate list in as opposed to augmenting the weight to the element in a tuple, which is how elements are represented for non-batched case?

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -501,6 +781,8 @@ class ApproximateQuantilesCombineFn(CombineFn, Generic[T]):
     weighted: (optional) if set to True, the combiner produces weighted
       quantiles. The input elements are then expected to be tuples of input
       values with the corresponding weight.
+    batch_input: (optional) if set to True, inputs are expected to be batches of

Review comment:
       Re: line 766: could you please clarify what is N in the docstring
   
   Also, can you please note that the algorithm referenced in the paper is generalized to compute weighted quantiles.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -582,6 +861,8 @@ def create(
       weighted: (optional) if set to True, the combiner produces weighted
         quantiles. The input elements are then expected to be tuples of values
         with the corresponding weight.
+      batch_input: (optional) if set to True, inputs are expected to be batches
+        of elements.

Review comment:
       I am not sure how k, b are computed here (see line 872) - it seems that b follows the experimental evaluation suggested in sect. 4.3 of the paper, which corresponds to a 'Munro-Paterson algorithm', while the experimental evaluation for the 'new' algorithm is covered in 4.5. Looking at the Table1, the 'new' algorithm has lower values of k, b, perhaps it is worth to reexamine this logic.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,126 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if reverse and key is None:
+      self.less_than = lambda a, b: a > b
+    elif reverse:
+      self.less_than = lambda a, b: key(a) > key(b)
+    elif key is None:
+      self.less_than = lambda a, b: a < b
+    else:
+      self.less_than = lambda a, b: key(a) < key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any

Review comment:
       Would this hint work here ? 
   ```
   # type: (List) -> Callable[[int], Any]
   ```

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -327,27 +330,39 @@ class Globally(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate

Review comment:
       wording suggestion: s/batch_input/input_batched  or inputs_batched, since the parameter refers to the input rather than the result (like in case of reverse).

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -61,30 +58,34 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-    import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+    # mmh3.hash64 returns two 64-bit unsigned integers
+    return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
 
-    def _mmh3_hash(value):
-      # mmh3.hash64 returns two 64-bit unsigned integers
-      return mmh3.hash64(value, seed=0, signed=False)[0]
+  def _md5_hash(value):
+    # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+    # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+    # integer fingerprint.
+    return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-    return _mmh3_hash
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-  except ImportError:
+
+def _get_default_hash_fn():
+  """Returns either murmurhash or md5 based on installation."""
+  if _default_hash_fn_type == 'md5':
     logging.warning(
         'Couldn\'t find murmurhash. Install mmh3 for a faster implementation of'

Review comment:
       Looks like there is already a binary version: https://pypi.org/project/mmh3-binary/, with a somewhat recent release (Apr 2020, but only 3.6 wheels: https://pypi.org/project/mmh3-binary/#files).

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -61,30 +58,34 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-    import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+    # mmh3.hash64 returns two 64-bit unsigned integers
+    return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
 
-    def _mmh3_hash(value):
-      # mmh3.hash64 returns two 64-bit unsigned integers
-      return mmh3.hash64(value, seed=0, signed=False)[0]
+  def _md5_hash(value):
+    # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+    # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+    # integer fingerprint.
+    return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-    return _mmh3_hash
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-  except ImportError:
+
+def _get_default_hash_fn():
+  """Returns either murmurhash or md5 based on installation."""
+  if _default_hash_fn_type == 'md5':
     logging.warning(
         'Couldn\'t find murmurhash. Install mmh3 for a faster implementation of'

Review comment:
       One downside is that mmh3 has only source release, and does not release wheel files. Installing mmh3 requires certain c++ compiler/headers dependencies be present on the machine. It appears that the project is no longer maintained. I tried to contact the maintainer and did not receive a response... Note that sklearn has also implemented a python wrapper for murmurhash: https://scikit-learn.org/stable/modules/generated/sklearn.utils.murmurhash3_32.html. We could likewise incorporate murmurhash into Beam codebase, make a (maintainable) fork of mmh3 and release wheel files, use sklearn's implementation, or try to explore a different library for our hashing needs. 

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -452,15 +511,236 @@ def __init__(self, buffer_size, num_buffers, unbuffered_elements, buffers):
     # into new, full buffers and then take them into account when computing the
     # final output.
     self.unbuffered_elements = unbuffered_elements
+    self.unbuffered_weights = unbuffered_weights
+
+  def __reduce__(self):

Review comment:
       For my education, why was this required? Is there some internal state that gets in the way of pickling? Also, could you please add a comment?

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,126 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if reverse and key is None:
+      self.less_than = lambda a, b: a > b
+    elif reverse:
+      self.less_than = lambda a, b: key(a) > key(b)
+    elif key is None:
+      self.less_than = lambda a, b: a < b
+    else:
+      self.less_than = lambda a, b: key(a) < key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):
   """A single buffer in the sense of the referenced algorithm.
   (see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
   &type=pdf and ApproximateQuantilesCombineFn for further information)"""
-  def __init__(self, elements, weighted, level=0, weight=1):
-    # type: (Sequence[T], bool, int, int) -> None
-    # In case of weighted quantiles, elements are tuples of values and weights.
+  def __init__(
+      self, elements, weights, weighted, level=0, min_val=None, max_val=None):

Review comment:
       Please comment that in non-weighted case weights stores a single element - the weight of the buffer in the sense of the algorithm. In the generalized (weighted) case, it stores weights of individual elements.

##########
File path: sdks/python/apache_beam/transforms/stats_test.py
##########
@@ -482,13 +482,74 @@ def test_alternate_quantiles(self):
           equal_to([["ccccc", "aaa", "b"]]),
           label='checkWithKeyAndReversed')
 
+  def test_batched_quantiles(self):

Review comment:
       I have some concerns about test coverage of  ApproximateQuantiles.
   
   1. Do any of the tests exercise merging of accumulators?
   2. Do any of the tests exercise collapsing of multiple buffers, including buffers with same & different weights?

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -523,29 +805,25 @@ def __init__(
       num_buffers,  # type: int
       key=None,
       reverse=False,
-      weighted=False):
-    def _comparator(a, b):
-      if key:
-        a, b = key(a), key(b)
-
-      retval = int(a > b) - int(a < b)
-
-      if reverse:
-        return -retval
-
-      return retval
-
-    self._comparator = _comparator
-
+      weighted=False,
+      batch_input=False):
     self._num_quantiles = num_quantiles
-    self._buffer_size = buffer_size
-    self._num_buffers = num_buffers
-    if weighted:
-      self._key = (lambda x: x[0]) if key is None else (lambda x: key(x[0]))
-    else:
-      self._key = key
-    self._reverse = reverse
-    self._weighted = weighted
+    self._spec = _QuantileSpec(buffer_size, num_buffers, weighted, key, reverse)
+    self._batch_input = batch_input
+    if self._batch_input:
+      setattr(self, 'add_input', self._add_inputs)

Review comment:
       Nit: `self.add_input = self._add_inputs` may be easier to read.
   

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -636,132 +895,33 @@ def _offset(self, new_weight):
       self._offset_jitter = 2 - self._offset_jitter
       return (new_weight + self._offset_jitter) / 2
 
-  def _collapse(self, buffers):
-    # type: (Iterable[_QuantileBuffer[T]]) -> _QuantileBuffer[T]
-    new_level = 0
-    new_weight = 0
-    for buffer_elem in buffers:
-      # As presented in the paper, there should always be at least two
-      # buffers of the same (minimal) level to collapse, but it is possible
-      # to violate this condition when combining buffers from independently
-      # computed shards.  If they differ we take the max.
-      new_level = max([new_level, buffer_elem.level + 1])
-      new_weight = new_weight + buffer_elem.weight
-    if self._weighted:
-      step = new_weight / (self._buffer_size - 1)
-      offset = new_weight / (2 * self._buffer_size)
-    else:
-      step = new_weight
-      offset = self._offset(new_weight)
-    new_elements = self._interpolate(buffers, self._buffer_size, step, offset)
-    return _QuantileBuffer(new_elements, self._weighted, new_level, new_weight)
-
-  def _collapse_if_needed(self, qs):
-    # type: (_QuantileState) -> None
-    while len(qs.buffers) > self._num_buffers:
-      to_collapse = []
-      to_collapse.append(heapq.heappop(qs.buffers))
-      to_collapse.append(heapq.heappop(qs.buffers))
-      min_level = to_collapse[1].level
-
-      while len(qs.buffers) > 0 and qs.buffers[0].level == min_level:
-        to_collapse.append(heapq.heappop(qs.buffers))
-
-      heapq.heappush(qs.buffers, self._collapse(to_collapse))
-
-  def _interpolate(self, i_buffers, count, step, offset):
-    """
-    Emulates taking the ordered union of all elements in buffers, repeated
-    according to their weight, and picking out the (k * step + offset)-th
-    elements of this list for `0 <= k < count`.
-    """
-
-    iterators = []
-    new_elements = []
-    compare_key = self._key
-    if self._key and not self._weighted:
-      compare_key = lambda x: self._key(x[0])
-    for buffer_elem in i_buffers:
-      iterators.append(buffer_elem.sized_iterator())
-
-    # Python 3 `heapq.merge` support key comparison and returns an iterator and
-    # does not pull the data into memory all at once. Python 2 does not
-    # support comparison on its `heapq.merge` api, so we use the itertools
-    # which takes the `key` function for comparison and creates an iterator
-    # from it.
-    if sys.version_info[0] < 3:
-      sorted_elem = iter(
-          sorted(
-              itertools.chain.from_iterable(iterators),
-              key=compare_key,
-              reverse=self._reverse))
-    else:
-      sorted_elem = heapq.merge(
-          *iterators, key=compare_key, reverse=self._reverse)
-
-    weighted_element = next(sorted_elem)
-    current = weighted_element[1]
-    j = 0
-    previous = 0
-    while j < count:
-      target = j * step + offset
-      j = j + 1
-      try:
-        while current <= target:
-          weighted_element = next(sorted_elem)
-          current = current + weighted_element[1]
-      except StopIteration:
-        pass
-      if self._weighted:
-        new_elements.append((weighted_element[0], current - previous))
-        previous = current
-      else:
-        new_elements.append(weighted_element[0])
-    return new_elements
-
   # TODO(BEAM-7746): Signature incompatible with supertype
   def create_accumulator(self):  # type: ignore[override]
-    # type: () -> _QuantileState[T]
+    # type: () -> _QuantileState
     self._qs = _QuantileState(
-        buffer_size=self._buffer_size,
-        num_buffers=self._num_buffers,
         unbuffered_elements=[],
-        buffers=[])
+        unbuffered_weights=[],
+        buffers=[],
+        spec=self._spec)
     return self._qs
 
   def add_input(self, quantile_state, element):
     """
     Add a new element to the collection being summarized by quantile state.
     """
-    value = element[0] if self._weighted else element
-    if quantile_state.is_empty():
-      quantile_state.min_val = quantile_state.max_val = value
-    elif self._comparator(value, quantile_state.min_val) < 0:
-      quantile_state.min_val = value
-    elif self._comparator(value, quantile_state.max_val) > 0:
-      quantile_state.max_val = value
-    self._add_unbuffered(quantile_state, elements=[element])
+    quantile_state.add_unbuffered([element], self._offset)

Review comment:
       would it make sense to make _collapse, _interpolate, _offset be methods of _QuantileState class ? Would that impact cythonization/performance? 
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-736657089


   gentle ping @robertwb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570715768



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -501,6 +781,8 @@ class ApproximateQuantilesCombineFn(CombineFn, Generic[T]):
     weighted: (optional) if set to True, the combiner produces weighted
       quantiles. The input elements are then expected to be tuples of input
       values with the corresponding weight.
+    batch_input: (optional) if set to True, inputs are expected to be batches of

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -327,27 +330,39 @@ class Globally(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate

Review comment:
       1. Done, also added examples.
   2. I think tuple (element, weight) generalizes the same way to (elements, weights) as it does to [(element1, weight1), ...], so I don't see any strong advantage of either from usability perspective (for instance, TFT's quantiles take them as separate tensors), but there's a benefit in taking (elements, weights) from code simplicity perspective - it allows weighted and unweighted cases to have a lot of code in common.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,126 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if reverse and key is None:
+      self.less_than = lambda a, b: a > b
+    elif reverse:
+      self.less_than = lambda a, b: key(a) > key(b)
+    elif key is None:
+      self.less_than = lambda a, b: a < b
+    else:
+      self.less_than = lambda a, b: key(a) < key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):
   """A single buffer in the sense of the referenced algorithm.
   (see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
   &type=pdf and ApproximateQuantilesCombineFn for further information)"""
-  def __init__(self, elements, weighted, level=0, weight=1):
-    # type: (Sequence[T], bool, int, int) -> None
-    # In case of weighted quantiles, elements are tuples of values and weights.
+  def __init__(
+      self, elements, weights, weighted, level=0, min_val=None, max_val=None):

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -523,29 +805,25 @@ def __init__(
       num_buffers,  # type: int
       key=None,
       reverse=False,
-      weighted=False):
-    def _comparator(a, b):
-      if key:
-        a, b = key(a), key(b)
-
-      retval = int(a > b) - int(a < b)
-
-      if reverse:
-        return -retval
-
-      return retval
-
-    self._comparator = _comparator
-
+      weighted=False,
+      batch_input=False):
     self._num_quantiles = num_quantiles
-    self._buffer_size = buffer_size
-    self._num_buffers = num_buffers
-    if weighted:
-      self._key = (lambda x: x[0]) if key is None else (lambda x: key(x[0]))
-    else:
-      self._key = key
-    self._reverse = reverse
-    self._weighted = weighted
+    self._spec = _QuantileSpec(buffer_size, num_buffers, weighted, key, reverse)
+    self._batch_input = batch_input
+    if self._batch_input:
+      setattr(self, 'add_input', self._add_inputs)

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -582,6 +861,8 @@ def create(
       weighted: (optional) if set to True, the combiner produces weighted
         quantiles. The input elements are then expected to be tuples of values
         with the corresponding weight.
+      batch_input: (optional) if set to True, inputs are expected to be batches
+        of elements.

Review comment:
       Yes, here the logic of the Munro-Paterson algorithm is used. Switching to the calculation from 4.5 would allow to reduce size of the (full) accumulator. But it's probably out of the scope of this PR, should I leave a TODO?

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,126 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if reverse and key is None:
+      self.less_than = lambda a, b: a > b
+    elif reverse:
+      self.less_than = lambda a, b: key(a) > key(b)
+    elif key is None:
+      self.less_than = lambda a, b: a < b
+    else:
+      self.less_than = lambda a, b: key(a) < key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -636,132 +895,33 @@ def _offset(self, new_weight):
       self._offset_jitter = 2 - self._offset_jitter
       return (new_weight + self._offset_jitter) / 2
 
-  def _collapse(self, buffers):
-    # type: (Iterable[_QuantileBuffer[T]]) -> _QuantileBuffer[T]
-    new_level = 0
-    new_weight = 0
-    for buffer_elem in buffers:
-      # As presented in the paper, there should always be at least two
-      # buffers of the same (minimal) level to collapse, but it is possible
-      # to violate this condition when combining buffers from independently
-      # computed shards.  If they differ we take the max.
-      new_level = max([new_level, buffer_elem.level + 1])
-      new_weight = new_weight + buffer_elem.weight
-    if self._weighted:
-      step = new_weight / (self._buffer_size - 1)
-      offset = new_weight / (2 * self._buffer_size)
-    else:
-      step = new_weight
-      offset = self._offset(new_weight)
-    new_elements = self._interpolate(buffers, self._buffer_size, step, offset)
-    return _QuantileBuffer(new_elements, self._weighted, new_level, new_weight)
-
-  def _collapse_if_needed(self, qs):
-    # type: (_QuantileState) -> None
-    while len(qs.buffers) > self._num_buffers:
-      to_collapse = []
-      to_collapse.append(heapq.heappop(qs.buffers))
-      to_collapse.append(heapq.heappop(qs.buffers))
-      min_level = to_collapse[1].level
-
-      while len(qs.buffers) > 0 and qs.buffers[0].level == min_level:
-        to_collapse.append(heapq.heappop(qs.buffers))
-
-      heapq.heappush(qs.buffers, self._collapse(to_collapse))
-
-  def _interpolate(self, i_buffers, count, step, offset):
-    """
-    Emulates taking the ordered union of all elements in buffers, repeated
-    according to their weight, and picking out the (k * step + offset)-th
-    elements of this list for `0 <= k < count`.
-    """
-
-    iterators = []
-    new_elements = []
-    compare_key = self._key
-    if self._key and not self._weighted:
-      compare_key = lambda x: self._key(x[0])
-    for buffer_elem in i_buffers:
-      iterators.append(buffer_elem.sized_iterator())
-
-    # Python 3 `heapq.merge` support key comparison and returns an iterator and
-    # does not pull the data into memory all at once. Python 2 does not
-    # support comparison on its `heapq.merge` api, so we use the itertools
-    # which takes the `key` function for comparison and creates an iterator
-    # from it.
-    if sys.version_info[0] < 3:
-      sorted_elem = iter(
-          sorted(
-              itertools.chain.from_iterable(iterators),
-              key=compare_key,
-              reverse=self._reverse))
-    else:
-      sorted_elem = heapq.merge(
-          *iterators, key=compare_key, reverse=self._reverse)
-
-    weighted_element = next(sorted_elem)
-    current = weighted_element[1]
-    j = 0
-    previous = 0
-    while j < count:
-      target = j * step + offset
-      j = j + 1
-      try:
-        while current <= target:
-          weighted_element = next(sorted_elem)
-          current = current + weighted_element[1]
-      except StopIteration:
-        pass
-      if self._weighted:
-        new_elements.append((weighted_element[0], current - previous))
-        previous = current
-      else:
-        new_elements.append(weighted_element[0])
-    return new_elements
-
   # TODO(BEAM-7746): Signature incompatible with supertype
   def create_accumulator(self):  # type: ignore[override]
-    # type: () -> _QuantileState[T]
+    # type: () -> _QuantileState
     self._qs = _QuantileState(
-        buffer_size=self._buffer_size,
-        num_buffers=self._num_buffers,
         unbuffered_elements=[],
-        buffers=[])
+        unbuffered_weights=[],
+        buffers=[],
+        spec=self._spec)
     return self._qs
 
   def add_input(self, quantile_state, element):
     """
     Add a new element to the collection being summarized by quantile state.
     """
-    value = element[0] if self._weighted else element
-    if quantile_state.is_empty():
-      quantile_state.min_val = quantile_state.max_val = value
-    elif self._comparator(value, quantile_state.min_val) < 0:
-      quantile_state.min_val = value
-    elif self._comparator(value, quantile_state.max_val) > 0:
-      quantile_state.max_val = value
-    self._add_unbuffered(quantile_state, elements=[element])
+    quantile_state.add_unbuffered([element], self._offset)

Review comment:
       I don't think that this may cause any problems with cythonization or performance. They will be static methods though, so the only difference is the namespace and neither of them deals with _QuantileState objects. But I don't have any strong preference, WDYT?

##########
File path: sdks/python/apache_beam/transforms/stats_test.py
##########
@@ -482,13 +482,74 @@ def test_alternate_quantiles(self):
           equal_to([["ccccc", "aaa", "b"]]),
           label='checkWithKeyAndReversed')
 
+  def test_batched_quantiles(self):

Review comment:
       1. I think the tests use DirectRunner, so probably no.
   2. The approximation will be properly tested only if either the number of inputs will be large with default settings, or max_num_elements and epsilon will be set to extremely low and large values, respectively. I tested approximation with large number of inputs and FlumeCppRunner during development, but it took some time to complete, so it's probably not suitable for continuous testing. It might make sense for me to initialize the CombineFn with the extreme values and test add_input, merge_accumulators and extract_output directly, WDYT?

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -452,15 +511,236 @@ def __init__(self, buffer_size, num_buffers, unbuffered_elements, buffers):
     # into new, full buffers and then take them into account when computing the
     # final output.
     self.unbuffered_elements = unbuffered_elements
+    self.unbuffered_weights = unbuffered_weights
+
+  def __reduce__(self):

Review comment:
       When Cythonization is enabled pickling fails without it. I can lookup the error description, if interested. Added a comment.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -61,30 +58,34 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-    import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+    # mmh3.hash64 returns two 64-bit unsigned integers
+    return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
 
-    def _mmh3_hash(value):
-      # mmh3.hash64 returns two 64-bit unsigned integers
-      return mmh3.hash64(value, seed=0, signed=False)[0]
+  def _md5_hash(value):
+    # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+    # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+    # integer fingerprint.
+    return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-    return _mmh3_hash
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-  except ImportError:
+
+def _get_default_hash_fn():
+  """Returns either murmurhash or md5 based on installation."""
+  if _default_hash_fn_type == 'md5':
     logging.warning(
         'Couldn\'t find murmurhash. Install mmh3 for a faster implementation of'

Review comment:
       Should I make it a dependency then?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r565652894



##########
File path: sdks/python/apache_beam/transforms/stats_test.py
##########
@@ -482,13 +482,74 @@ def test_alternate_quantiles(self):
           equal_to([["ccccc", "aaa", "b"]]),
           label='checkWithKeyAndReversed')
 
+  def test_batched_quantiles(self):

Review comment:
       We should have codecov working again, but somehow retesting didn't trigger it. I think if you add commits to this branch or repush it, codecov will run and may give some signals re: test coverage.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-730712645


   @iindyk - What is the next step on this PR?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-781783340


   Run PythonDocker PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r521434129



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -299,15 +300,17 @@ class ApproximateQuantiles(object):
     out: [0, 2, 5, 7, 100]
   """
   @staticmethod
-  def _display_data(num_quantiles, key, reverse, weighted):
+  def _display_data(num_quantiles, key, reverse, weighted, batch_input):
     return {
         'num_quantiles': DisplayDataItem(num_quantiles, label='Quantile Count'),
         'key': DisplayDataItem(
             key.__name__
             if hasattr(key, '__name__') else key.__class__.__name__,
             label='Record Comparer Key'),
         'reverse': DisplayDataItem(str(reverse), label='Is Reversed'),
-        'weighted': DisplayDataItem(str(weighted), label='Is Weighted')
+        'weighted': DisplayDataItem(str(weighted), label='Is Weighted'),
+        'batch_input': DisplayDataItem(
+            str(batch_input), label='Is Input Batched')

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r573215945



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -327,27 +330,39 @@ class Globally(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate

Review comment:
       SG, thank you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570716312



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -582,6 +861,8 @@ def create(
       weighted: (optional) if set to True, the combiner produces weighted
         quantiles. The input elements are then expected to be tuples of values
         with the corresponding weight.
+      batch_input: (optional) if set to True, inputs are expected to be batches
+        of elements.

Review comment:
       Yes, here the logic of the Munro-Paterson algorithm is used. Switching to the calculation from 4.5 would allow to reduce size of the (full) accumulator. But it's probably out of the scope of this PR, should I leave a TODO?

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,126 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if reverse and key is None:
+      self.less_than = lambda a, b: a > b
+    elif reverse:
+      self.less_than = lambda a, b: key(a) > key(b)
+    elif key is None:
+      self.less_than = lambda a, b: a < b
+    else:
+      self.less_than = lambda a, b: key(a) < key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (59b4d6a) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.52%`.
   > The diff coverage is `95.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   83.01%   +0.52%     
   ==========================================
     Files         455      469      +14     
     Lines       54876    58331    +3455     
   ==========================================
   + Hits        45266    48425    +3159     
   - Misses       9610     9906     +296     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.07% <ø> (-0.22%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `86.45% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.07% <ø> (-4.34%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [147 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...59b4d6a](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-773783934


   Thanks for the review, Valentyn!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn merged pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn merged pull request #13175:
URL: https://github.com/apache/beam/pull/13175


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r573235644



##########
File path: sdks/python/apache_beam/transforms/stats_test.py
##########
@@ -482,13 +482,74 @@ def test_alternate_quantiles(self):
           equal_to([["ccccc", "aaa", "b"]]),
           label='checkWithKeyAndReversed')
 
+  def test_batched_quantiles(self):

Review comment:
       Given that a typo that creeps in might not be discovered until the code runs on a sufficiently large input, I'd be more comfortable merging this if we added targeted unit tests just for methods in question, exercising the logic that is not exercised in direct runner test due to small size/undeterminism. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r514366013



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:
+      self.less_than = lambda a, b: a < b
+    elif key is None:
+      self.less_than = lambda a, b: a > b
+    elif not reverse:
+      self.less_than = lambda a, b: key(a) < key(b)
+    else:
+      self.less_than = lambda a, b: key(a) > key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):

Review comment:
       Is there a good way to make this inheritance work with Cython? 
   It doesn't compile with inheritance from a non-extension type.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-724033715


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r574970762



##########
File path: sdks/python/apache_beam/transforms/stats_test.py
##########
@@ -482,13 +482,74 @@ def test_alternate_quantiles(self):
           equal_to([["ccccc", "aaa", "b"]]),
           label='checkWithKeyAndReversed')
 
+  def test_batched_quantiles(self):

Review comment:
       Makes sense. I added a test that creates a combinefn with extremely small max_num_elements and manually split the data to a bunch of accumulators which are then merged. Also, the small max_num_elements will ensure that buffer collapsing and interpolation is exercised.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-846166764


   > One downside is that mmh3 has only source release, and does not release wheel files. Installing mmh3 requires certain c++ compiler/headers dependencies be present on the machine. 
   
   Looks like a recent release includes the wheels: https://pypi.org/project/mmh3/3.0.0/.
   So we can move a dependency on mmh3 to Beam instead of tfx.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570719799



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -452,15 +511,236 @@ def __init__(self, buffer_size, num_buffers, unbuffered_elements, buffers):
     # into new, full buffers and then take them into account when computing the
     # final output.
     self.unbuffered_elements = unbuffered_elements
+    self.unbuffered_weights = unbuffered_weights
+
+  def __reduce__(self):

Review comment:
       When Cythonization is enabled pickling fails without it. I can lookup the error description, if interested. Added a comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r520973512



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -299,15 +300,17 @@ class ApproximateQuantiles(object):
     out: [0, 2, 5, 7, 100]
   """
   @staticmethod
-  def _display_data(num_quantiles, key, reverse, weighted):
+  def _display_data(num_quantiles, key, reverse, weighted, batch_input):
     return {
         'num_quantiles': DisplayDataItem(num_quantiles, label='Quantile Count'),
         'key': DisplayDataItem(
             key.__name__
             if hasattr(key, '__name__') else key.__class__.__name__,
             label='Record Comparer Key'),
         'reverse': DisplayDataItem(str(reverse), label='Is Reversed'),
-        'weighted': DisplayDataItem(str(weighted), label='Is Weighted')
+        'weighted': DisplayDataItem(str(weighted), label='Is Weighted'),
+        'batch_input': DisplayDataItem(
+            str(batch_input), label='Is Input Batched')

Review comment:
       Go ahead and put a trailing comma on this one too. 

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))

Review comment:
       Ah, OK. Thanks for the info.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:
+      self.less_than = lambda a, b: a < b
+    elif key is None:
+      self.less_than = lambda a, b: a > b
+    elif not reverse:
+      self.less_than = lambda a, b: key(a) < key(b)
+    else:
+      self.less_than = lambda a, b: key(a) > key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):

Review comment:
       You should be able to inherit rom both object and Generic[T]. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r514363995



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -61,30 +58,34 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-    import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+    # mmh3.hash64 returns two 64-bit unsigned integers
+    return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
 
-    def _mmh3_hash(value):
-      # mmh3.hash64 returns two 64-bit unsigned integers
-      return mmh3.hash64(value, seed=0, signed=False)[0]
+  def _md5_hash(value):
+    # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+    # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+    # integer fingerprint.
+    return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-    return _mmh3_hash
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-  except ImportError:
+
+def _get_default_hash_fn():
+  """Returns either murmurhash or md5 based on installation."""
+  if _default_hash_fn_type == 'md5':
     logging.warning(
         'Couldn\'t find murmurhash. Install mmh3 for a faster implementation of'

Review comment:
       Not sure if it's still maintained, last release happened 3 years ago. I'm not aware of any other downsides, wdyt?
   
    https://github.com/hajimes/mmh3

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (59b4d6a) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.52%`.
   > The diff coverage is `95.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   83.01%   +0.52%     
   ==========================================
     Files         455      469      +14     
     Lines       54876    58331    +3455     
   ==========================================
   + Hits        45266    48425    +3159     
   - Misses       9610     9906     +296     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.07% <ø> (-0.22%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `86.45% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.07% <ø> (-4.34%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [147 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...59b4d6a](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r521438895



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if key is None and not reverse:
+      self.less_than = lambda a, b: a < b
+    elif key is None:
+      self.less_than = lambda a, b: a > b
+    elif not reverse:
+      self.less_than = lambda a, b: key(a) < key(b)
+    else:
+      self.less_than = lambda a, b: key(a) > key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):

Review comment:
       when inherit _QuantileBuffer(object, Generic[T]), without Cython I get 
   ```
   TypeError: Cannot create a consistent method resolution order (MRO) for bases object, Generic
   ```
   When I do _QuantileBuffer(Generic[T], object), then it works for Python, but with Cythonization I get 
   ```
   First base of '_QuantileBuffer' is not an extension type.
   ```
   and
   ```
   Only one extension type base class allowed.
   ```
   Am I missing something?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (8fff438) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.30%`.
   > The diff coverage is `81.81%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   82.78%   +0.30%     
   ==========================================
     Files         455      466      +11     
     Lines       54876    57589    +2713     
   ==========================================
   + Hits        45266    47677    +2411     
   - Misses       9610     9912     +302     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.97% <ø> (+0.67%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `86.45% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.11% <ø> (-4.30%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [137 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...8fff438](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570716042



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -327,27 +330,39 @@ class Globally(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate

Review comment:
       1. Done, also added examples.
   2. I think tuple (element, weight) generalizes the same way to (elements, weights) as it does to [(element1, weight1), ...], so I don't see any strong advantage of either from usability perspective (for instance, TFT's quantiles take them as separate tensors), but there's a benefit in taking (elements, weights) from code simplicity perspective - it allows weighted and unweighted cases to have a lot of code in common.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (8fff438) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.30%`.
   > The diff coverage is `81.81%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   82.78%   +0.30%     
   ==========================================
     Files         455      466      +11     
     Lines       54876    57589    +2713     
   ==========================================
   + Hits        45266    47677    +2411     
   - Misses       9610     9912     +302     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.97% <ø> (+0.67%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `86.45% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.11% <ø> (-4.30%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [137 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...22a1c31](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-776266859


   Thanks, Ihor for working on this optimization and sorry for delayed reviews, overall this LGTM, my only remaining concern is that we may need strengthen unit test coverage for accumulator logic that is not exercised by current direct runner test (see comments above). 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r514365929



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,129 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))

Review comment:
       And would also increase memory usage by elements_in_buffers * weight_type_size, which may be significant relative to the current memory usage.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570716102



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -368,82 +383,126 @@ class PerKey(PTransform):
       weighted: (optional) if set to True, the transform returns weighted
         quantiles. The input PCollection is then expected to contain tuples of
         input values with the corresponding weight.
+      batch_input: (optional) if set to True, the transform expects each element
+        of input PCollection to be a batch. Provides a way to accumulate
+        multiple elements at a time more efficiently.
     """
-    def __init__(self, num_quantiles, key=None, reverse=False, weighted=False):
+    def __init__(
+        self,
+        num_quantiles,
+        key=None,
+        reverse=False,
+        weighted=False,
+        batch_input=False):
       self._num_quantiles = num_quantiles
       self._key = key
       self._reverse = reverse
       self._weighted = weighted
+      self._batch_input = batch_input
 
     def expand(self, pcoll):
       return pcoll | CombinePerKey(
           ApproximateQuantilesCombineFn.create(
               num_quantiles=self._num_quantiles,
               key=self._key,
               reverse=self._reverse,
-              weighted=self._weighted))
+              weighted=self._weighted,
+              batch_input=self._batch_input))
 
     def display_data(self):
       return ApproximateQuantiles._display_data(
           num_quantiles=self._num_quantiles,
           key=self._key,
           reverse=self._reverse,
-          weighted=self._weighted)
+          weighted=self._weighted,
+          batch_input=self._batch_input)
+
+
+class _QuantileSpec(object):
+  """Quantiles computation specifications."""
+  def __init__(self, buffer_size, num_buffers, weighted, key, reverse):
+    # type: (int, int, bool, Any, bool) -> None
+    self.buffer_size = buffer_size
+    self.num_buffers = num_buffers
+    self.weighted = weighted
+    self.key = key
+    self.reverse = reverse
+
+    # Used to sort tuples of values and weights.
+    self.weighted_key = None if key is None else (lambda x: key(x[0]))
+
+    # Used to compare values.
+    if reverse and key is None:
+      self.less_than = lambda a, b: a > b
+    elif reverse:
+      self.less_than = lambda a, b: key(a) > key(b)
+    elif key is None:
+      self.less_than = lambda a, b: a < b
+    else:
+      self.less_than = lambda a, b: key(a) < key(b)
+
+  def get_argsort_key(self, elements):
+    # type: (List) -> Any
+
+    """Returns a key for sorting indices of elements by element's value."""
+    if self.key is None:
+      return elements.__getitem__
+    else:
+      return lambda idx: self.key(elements[idx])
+
+  def __reduce__(self):
+    return (
+        self.__class__,
+        (
+            self.buffer_size,
+            self.num_buffers,
+            self.weighted,
+            self.key,
+            self.reverse))
 
 
-class _QuantileBuffer(Generic[T]):
+class _QuantileBuffer(object):
   """A single buffer in the sense of the referenced algorithm.
   (see http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.6.6513&rep=rep1
   &type=pdf and ApproximateQuantilesCombineFn for further information)"""
-  def __init__(self, elements, weighted, level=0, weight=1):
-    # type: (Sequence[T], bool, int, int) -> None
-    # In case of weighted quantiles, elements are tuples of values and weights.
+  def __init__(
+      self, elements, weights, weighted, level=0, min_val=None, max_val=None):

Review comment:
       Done.

##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -523,29 +805,25 @@ def __init__(
       num_buffers,  # type: int
       key=None,
       reverse=False,
-      weighted=False):
-    def _comparator(a, b):
-      if key:
-        a, b = key(a), key(b)
-
-      retval = int(a > b) - int(a < b)
-
-      if reverse:
-        return -retval
-
-      return retval
-
-    self._comparator = _comparator
-
+      weighted=False,
+      batch_input=False):
     self._num_quantiles = num_quantiles
-    self._buffer_size = buffer_size
-    self._num_buffers = num_buffers
-    if weighted:
-      self._key = (lambda x: x[0]) if key is None else (lambda x: key(x[0]))
-    else:
-      self._key = key
-    self._reverse = reverse
-    self._weighted = weighted
+    self._spec = _QuantileSpec(buffer_size, num_buffers, weighted, key, reverse)
+    self._batch_input = batch_input
+    if self._batch_input:
+      setattr(self, 'add_input', self._add_inputs)

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding Cythonization and other performance improvements to Approximat…

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-716800472


   Sorry for a large diff in a single PR, mostly due to moving Cythonized things out of ApproximateQuantilesCombineFn.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r574969997



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -582,6 +861,8 @@ def create(
       weighted: (optional) if set to True, the combiner produces weighted
         quantiles. The input elements are then expected to be tuples of values
         with the corresponding weight.
+      batch_input: (optional) if set to True, inputs are expected to be batches
+        of elements.

Review comment:
       Added a note.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-846166764


   > One downside is that mmh3 has only source release, and does not release wheel files. Installing mmh3 requires certain c++ compiler/headers dependencies be present on the machine. It
   
   Looks like a recent release includes the wheels: https://pypi.org/project/mmh3/3.0.0/.
   So we can move a dependency on mmh3 to Beam instead of tfx.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r578894506



##########
File path: sdks/python/apache_beam/transforms/stats_test.py
##########
@@ -482,13 +482,74 @@ def test_alternate_quantiles(self):
           equal_to([["ccccc", "aaa", "b"]]),
           label='checkWithKeyAndReversed')
 
+  def test_batched_quantiles(self):

Review comment:
       Thank you, much appreciated!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (ad95556) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.52%`.
   > The diff coverage is `95.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   83.01%   +0.52%     
   ==========================================
     Files         455      469      +14     
     Lines       54876    58330    +3454     
   ==========================================
   + Hits        45266    48424    +3158     
   - Misses       9610     9906     +296     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.07% <ø> (-0.22%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `87.50% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.07% <ø> (-4.34%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [148 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...59b4d6a](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (ad95556) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.52%`.
   > The diff coverage is `95.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   83.01%   +0.52%     
   ==========================================
     Files         455      469      +14     
     Lines       54876    58330    +3454     
   ==========================================
   + Hits        45266    48424    +3158     
   - Misses       9610     9906     +296     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.07% <ø> (-0.22%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `87.50% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.07% <ø> (-4.34%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [148 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...ad95556](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r573220608



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -582,6 +861,8 @@ def create(
       weighted: (optional) if set to True, the combiner produces weighted
         quantiles. The input elements are then expected to be tuples of values
         with the corresponding weight.
+      batch_input: (optional) if set to True, inputs are expected to be batches
+        of elements.

Review comment:
       Agree that this is out of scope, SGTM to add a note in case someone happens to read through this..
   AFAICT this is not critical, and these numbers numbers are tied to MAX_ELEMENTS, which is somewhat arbitrary and not exposed to in the user. Perhaps if we decide to expose MAX_ELEMENTS as a transform param, we'd have to take a closer look at this. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (8fff438) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.30%`.
   > The diff coverage is `81.81%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   82.78%   +0.30%     
   ==========================================
     Files         455      466      +11     
     Lines       54876    57589    +2713     
   ==========================================
   + Hits        45266    47677    +2411     
   - Misses       9610     9912     +302     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.97% <ø> (+0.67%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `86.45% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.11% <ø> (-4.30%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [137 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...2f4aebf](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding Cythonization and other performance improvements to Approximat…

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-716799440


   R: @robertwb


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (8fff438) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.30%`.
   > The diff coverage is `81.81%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   82.78%   +0.30%     
   ==========================================
     Files         455      466      +11     
     Lines       54876    57589    +2713     
   ==========================================
   + Hits        45266    47677    +2411     
   - Misses       9610     9912     +302     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.97% <ø> (+0.67%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `86.45% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.11% <ø> (-4.30%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [137 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...8fff438](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-759833658


   Took a quick look, I think I'd have to read through the original paper to make an informed opinion on the change; unless @iindyk has a reviewer in mind who is already familiar with the algorithm, I am happy to take a look. It will take me a bit of time to get up to speed on algorithm, but it looks interesting.
   @iindyk - are the optimizations also described in [MRL98] or your change is an optimization on top of that algorithm? Also, would you mind splitting the proposed optimizations into individual commits (or PRs), one commit per optimization if that's possible? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r574972701



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -523,29 +805,25 @@ def __init__(
       num_buffers,  # type: int
       key=None,
       reverse=False,
-      weighted=False):
-    def _comparator(a, b):
-      if key:
-        a, b = key(a), key(b)
-
-      retval = int(a > b) - int(a < b)
-
-      if reverse:
-        return -retval
-
-      return retval
-
-    self._comparator = _comparator
-
+      weighted=False,
+      batch_input=False):
     self._num_quantiles = num_quantiles
-    self._buffer_size = buffer_size
-    self._num_buffers = num_buffers
-    if weighted:
-      self._key = (lambda x: x[0]) if key is None else (lambda x: key(x[0]))
-    else:
-      self._key = key
-    self._reverse = reverse
-    self._weighted = weighted
+    self._spec = _QuantileSpec(buffer_size, num_buffers, weighted, key, reverse)
+    self._batch_input = batch_input
+    if self._batch_input:
+      setattr(self, 'add_input', self._add_inputs)

Review comment:
       Just realized that the direct assignment causes a lint error:
   ```
   03:29:45 apache_beam/transforms/stats.py:837: error: Cannot assign to a method  [assignment]
   ```
   changed back to setattr




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (8fff438) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.30%`.
   > The diff coverage is `81.81%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   82.78%   +0.30%     
   ==========================================
     Files         455      466      +11     
     Lines       54876    57589    +2713     
   ==========================================
   + Hits        45266    47677    +2411     
   - Misses       9610     9912     +302     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.97% <ø> (+0.67%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `86.45% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.11% <ø> (-4.30%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [137 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...8fff438](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r574175825



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -61,30 +58,34 @@
 K = typing.TypeVar('K')
 V = typing.TypeVar('V')
 
+try:
+  import mmh3  # pylint: disable=import-error
 
-def _get_default_hash_fn():
-  """Returns either murmurhash or md5 based on installation."""
-  try:
-    import mmh3  # pylint: disable=import-error
+  def _mmh3_hash(value):
+    # mmh3.hash64 returns two 64-bit unsigned integers
+    return mmh3.hash64(value, seed=0, signed=False)[0]
+
+  _default_hash_fn = _mmh3_hash
+  _default_hash_fn_type = 'mmh3'
+except ImportError:
 
-    def _mmh3_hash(value):
-      # mmh3.hash64 returns two 64-bit unsigned integers
-      return mmh3.hash64(value, seed=0, signed=False)[0]
+  def _md5_hash(value):
+    # md5 is a 128-bit hash, so we truncate the hexdigest (string of 32
+    # hexadecimal digits) to 16 digits and convert to int to get the 64-bit
+    # integer fingerprint.
+    return int(hashlib.md5(value).hexdigest()[:16], 16)
 
-    return _mmh3_hash
+  _default_hash_fn = _md5_hash
+  _default_hash_fn_type = 'md5'
 
-  except ImportError:
+
+def _get_default_hash_fn():
+  """Returns either murmurhash or md5 based on installation."""
+  if _default_hash_fn_type == 'md5':
     logging.warning(
         'Couldn\'t find murmurhash. Install mmh3 for a faster implementation of'

Review comment:
       I wouldn't make it a dependency until it releases wheels.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-745345591


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-773783934


   Thanks for the review, Valentyn!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r573234373



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -636,132 +895,33 @@ def _offset(self, new_weight):
       self._offset_jitter = 2 - self._offset_jitter
       return (new_weight + self._offset_jitter) / 2
 
-  def _collapse(self, buffers):
-    # type: (Iterable[_QuantileBuffer[T]]) -> _QuantileBuffer[T]
-    new_level = 0
-    new_weight = 0
-    for buffer_elem in buffers:
-      # As presented in the paper, there should always be at least two
-      # buffers of the same (minimal) level to collapse, but it is possible
-      # to violate this condition when combining buffers from independently
-      # computed shards.  If they differ we take the max.
-      new_level = max([new_level, buffer_elem.level + 1])
-      new_weight = new_weight + buffer_elem.weight
-    if self._weighted:
-      step = new_weight / (self._buffer_size - 1)
-      offset = new_weight / (2 * self._buffer_size)
-    else:
-      step = new_weight
-      offset = self._offset(new_weight)
-    new_elements = self._interpolate(buffers, self._buffer_size, step, offset)
-    return _QuantileBuffer(new_elements, self._weighted, new_level, new_weight)
-
-  def _collapse_if_needed(self, qs):
-    # type: (_QuantileState) -> None
-    while len(qs.buffers) > self._num_buffers:
-      to_collapse = []
-      to_collapse.append(heapq.heappop(qs.buffers))
-      to_collapse.append(heapq.heappop(qs.buffers))
-      min_level = to_collapse[1].level
-
-      while len(qs.buffers) > 0 and qs.buffers[0].level == min_level:
-        to_collapse.append(heapq.heappop(qs.buffers))
-
-      heapq.heappush(qs.buffers, self._collapse(to_collapse))
-
-  def _interpolate(self, i_buffers, count, step, offset):
-    """
-    Emulates taking the ordered union of all elements in buffers, repeated
-    according to their weight, and picking out the (k * step + offset)-th
-    elements of this list for `0 <= k < count`.
-    """
-
-    iterators = []
-    new_elements = []
-    compare_key = self._key
-    if self._key and not self._weighted:
-      compare_key = lambda x: self._key(x[0])
-    for buffer_elem in i_buffers:
-      iterators.append(buffer_elem.sized_iterator())
-
-    # Python 3 `heapq.merge` support key comparison and returns an iterator and
-    # does not pull the data into memory all at once. Python 2 does not
-    # support comparison on its `heapq.merge` api, so we use the itertools
-    # which takes the `key` function for comparison and creates an iterator
-    # from it.
-    if sys.version_info[0] < 3:
-      sorted_elem = iter(
-          sorted(
-              itertools.chain.from_iterable(iterators),
-              key=compare_key,
-              reverse=self._reverse))
-    else:
-      sorted_elem = heapq.merge(
-          *iterators, key=compare_key, reverse=self._reverse)
-
-    weighted_element = next(sorted_elem)
-    current = weighted_element[1]
-    j = 0
-    previous = 0
-    while j < count:
-      target = j * step + offset
-      j = j + 1
-      try:
-        while current <= target:
-          weighted_element = next(sorted_elem)
-          current = current + weighted_element[1]
-      except StopIteration:
-        pass
-      if self._weighted:
-        new_elements.append((weighted_element[0], current - previous))
-        previous = current
-      else:
-        new_elements.append(weighted_element[0])
-    return new_elements
-
   # TODO(BEAM-7746): Signature incompatible with supertype
   def create_accumulator(self):  # type: ignore[override]
-    # type: () -> _QuantileState[T]
+    # type: () -> _QuantileState
     self._qs = _QuantileState(
-        buffer_size=self._buffer_size,
-        num_buffers=self._num_buffers,
         unbuffered_elements=[],
-        buffers=[])
+        unbuffered_weights=[],
+        buffers=[],
+        spec=self._spec)
     return self._qs
 
   def add_input(self, quantile_state, element):
     """
     Add a new element to the collection being summarized by quantile state.
     """
-    value = element[0] if self._weighted else element
-    if quantile_state.is_empty():
-      quantile_state.min_val = quantile_state.max_val = value
-    elif self._comparator(value, quantile_state.min_val) < 0:
-      quantile_state.min_val = value
-    elif self._comparator(value, quantile_state.max_val) > 0:
-      quantile_state.max_val = value
-    self._add_unbuffered(quantile_state, elements=[element])
+    quantile_state.add_unbuffered([element], self._offset)

Review comment:
       There is no big difference if they are static, but I thought that if they were instance methods you perhaps you wouldn't have to pass spec or offset_fn. No strong opinion, feel free to leave as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (ad95556) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.52%`.
   > The diff coverage is `95.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   83.01%   +0.52%     
   ==========================================
     Files         455      469      +14     
     Lines       54876    58330    +3454     
   ==========================================
   + Hits        45266    48424    +3158     
   - Misses       9610     9906     +296     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.07% <ø> (-0.22%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `87.50% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.07% <ø> (-4.34%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [148 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...ad95556](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (8fff438) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.30%`.
   > The diff coverage is `81.81%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   82.78%   +0.30%     
   ==========================================
     Files         455      466      +11     
     Lines       54876    57589    +2713     
   ==========================================
   + Hits        45266    47677    +2411     
   - Misses       9610     9912     +302     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.97% <ø> (+0.67%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `86.45% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.11% <ø> (-4.30%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [137 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...ad95556](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on a change in pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on a change in pull request #13175:
URL: https://github.com/apache/beam/pull/13175#discussion_r570715768



##########
File path: sdks/python/apache_beam/transforms/stats.py
##########
@@ -501,6 +781,8 @@ class ApproximateQuantilesCombineFn(CombineFn, Generic[T]):
     weighted: (optional) if set to True, the combiner produces weighted
       quantiles. The input elements are then expected to be tuples of input
       values with the corresponding weight.
+    batch_input: (optional) if set to True, inputs are expected to be batches of

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-768604600


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=h1) Report
   > Merging [#13175](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=desc) (ad95556) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) (3d6cc0e) will **increase** coverage by `0.52%`.
   > The diff coverage is `95.34%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13175/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13175      +/-   ##
   ==========================================
   + Coverage   82.48%   83.01%   +0.52%     
   ==========================================
     Files         455      469      +14     
     Lines       54876    58330    +3454     
   ==========================================
   + Hits        45266    48424    +3158     
   - Misses       9610     9906     +296     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/dataframe/frames.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL2ZyYW1lcy5weQ==) | `91.07% <ø> (-0.22%)` | :arrow_down: |
   | [sdks/python/apache\_beam/dataframe/partitionings.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3BhcnRpdGlvbmluZ3MucHk=) | `91.39% <ø> (+2.35%)` | :arrow_up: |
   | [sdks/python/apache\_beam/dataframe/transforms.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZGF0YWZyYW1lL3RyYW5zZm9ybXMucHk=) | `94.71% <ø> (-0.83%)` | :arrow_down: |
   | [...s/python/apache\_beam/examples/snippets/snippets.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vZXhhbXBsZXMvc25pcHBldHMvc25pcHBldHMucHk=) | `76.97% <ø> (-12.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/internal/metrics/\_\_init\_\_.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9fX2luaXRfXy5weQ==) | `100.00% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/cells.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9jZWxscy5weQ==) | `72.41% <ø> (ø)` | |
   | [sdks/python/apache\_beam/internal/metrics/metric.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW50ZXJuYWwvbWV0cmljcy9tZXRyaWMucHk=) | `87.50% <ø> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5LnB5) | `75.07% <ø> (-4.34%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.70% <ø> (-0.09%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/gcsio.py](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2djc2lvLnB5) | `90.54% <ø> (-0.19%)` | :arrow_down: |
   | ... and [148 more](https://codecov.io/gh/apache/beam/pull/13175/diff?src=pr&el=tree-more) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=footer). Last update [c0a7e66...ad95556](https://codecov.io/gh/apache/beam/pull/13175?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn edited a comment on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
tvalentyn edited a comment on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-759833658


   Took a quick look, I think I'd have to read through the original paper to make an informed opinion on the change; unless @iindyk has a reviewer in mind who is already familiar with the algorithm, I am happy to take a look. It will take me a bit of time to get up to speed on algorithm, but it looks interesting.
   @iindyk - are the optimizations in this PR also described in [MRL98], or your change is an optimization on top of that algorithm? Also, would you mind splitting the proposed optimizations into individual commits (or PRs), one commit per optimization if that's possible? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] iindyk commented on pull request #13175: Adding performance improvements to ApproximateQuantiles.

Posted by GitBox <gi...@apache.org>.
iindyk commented on pull request #13175:
URL: https://github.com/apache/beam/pull/13175#issuecomment-731104244


   I'd be happy to address Robert's comment about inheritance for type checking, but the suggestion doesn't work for me atm, see discussion above. So I'm waiting on comment from @robertwb.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org