You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/10/09 14:05:39 UTC

[GitHub] [beam] kamilwu opened a new pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

kamilwu opened a new pull request #13048:
URL: https://github.com/apache/beam/pull/13048


   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r514395054



##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -411,6 +411,33 @@ def visit_transform(self, transform_node):
 
     return FlattenInputVisitor()
 
+  @staticmethod
+  def combinefn_visitor():
+    # Imported here to avoid circular dependencies.
+    from apache_beam.pipeline import PipelineVisitor
+    from apache_beam import core
+
+    class CombineFnVisitor(PipelineVisitor):
+      """Checks if `CombineFn` has non-default setup or teardown methods.
+      If yes, raises `ValueError`.
+      """
+      def visit_transform(self, applied_transform):
+        transform = applied_transform.transform
+        if isinstance(transform, core.ParDo) and isinstance(
+            transform.fn, core.CombineValuesDoFn):
+          if self._overrides_setup_or_teardown(transform.fn.combinefn):
+            raise ValueError(
+                'CombineFn.setup and CombineFn.teardown are '
+                'not supported with non-portable Dataflow '
+                'runner. Please use Dataflow Runner V2 instead.')

Review comment:
       I think the question is for Dataflow team. From my perspective, I think there's no such need to support this in non-portable Dataflow, given that new batch pipelines will start using Dataflow Runner V2 in a month (December 4). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-707164677


   > Noting one difference compare to similar DoFn functions. CombineFn's may have hiearrchies (e.g. _TupleCombineFnBase) 
   
   Thanks, I totally missed that! There are even more of them (e.g. `TypeCheckCombineFn`, `_CurriedFn`). I will update all of them.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) will **increase** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master   #13048   +/-   ##
   =======================================
     Coverage   82.48%   82.49%           
   =======================================
     Files         455      455           
     Lines       54876    54876           
   =======================================
   + Hits        45266    45270    +4     
   + Misses       9610     9606    -4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `89.20% <0.00%> (+0.44%)` | :arrow_up: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `98.24% <0.00%> (+1.75%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [15fd04d...74dd004](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-722025568


   I am seeing an error in one of the tests that we run internally with this change:
   
   ```
   test_global_fanout (apache_beam.transforms.combiners_test.CombineTest) ... ERROR
   
   ======================================================================
   ERROR: test_global_fanout (apache_beam.transforms.combiners_test.CombineTest)
   ----------------------------------------------------------------------
   Traceback (most recent call last):
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/transforms/combiners_test.py", line 544, in test_global_fanout
       assert_that(result, equal_to([49.5]))
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/pipeline.py", line 568, in __exit__
       self.result = self.run()
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/testing/test_pipeline.py", line 112, in run
       False if self.not_use_test_runner_api else test_runner_api))
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/pipeline.py", line 518, in run
       allow_proto_holders=True).run(False)
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/pipeline.py", line 547, in run
       return self.runner.run_pipeline(self, self._options)
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/runners/dataflow/test_dataflow_runner.py", line 57, in run_pipeline
       self).run_pipeline(pipeline, options)
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/runners/dataflow/dataflow_runner.py", line 495, in run_pipeline
       self._check_for_unsupported_features_on_non_portable_worker(pipeline)
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/runners/dataflow/dataflow_runner.py", line 467, in _check_for_unsupported_features_on_non_portable_worker
       pipeline.visit(self.combinefn_visitor())
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/pipeline.py", line 594, in visit
       self._root_transform().visit(visitor, self, visited)
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/pipeline.py", line 1085, in visit
       part.visit(visitor, pipeline, visited)
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/pipeline.py", line 1085, in visit
       part.visit(visitor, pipeline, visited)
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/pipeline.py", line 1085, in visit
       part.visit(visitor, pipeline, visited)
     [Previous line repeated 2 more times]
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/pipeline.py", line 1088, in visit
       visitor.visit_transform(self)
     File "/build/work/12791c230bd5926c2b95fef2c4fb1814e38e/google3/runfiles/google3/third_party/py/apache_beam/runners/dataflow/dataflow_runner.py", line 430, in visit_transform
       'CombineFn.setup and CombineFn.teardown are '
   ValueError: CombineFn.setup and CombineFn.teardown are not supported with non-portable Dataflow runner. Please use Dataflow Runner V2 instead.
   
   ----------------------------------------------------------------------
   Ran 1 test in 1.216s
   
   FAILED (errors=1)
   ```
   
   I wonder if this broke ValidatesRunner tests on RunnerV1.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu removed a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu removed a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705507128






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.50%   -0.01%     
   ==========================================
     Files         455      455              
     Lines       54867    54910      +43     
   ==========================================
   + Hits        45272    45304      +32     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=) | `98.38% <100.00%> (+0.23%)` | :arrow_up: |
   | [...ks/python/apache\_beam/runners/worker/operations.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9ucy5weQ==) | `70.39% <100.00%> (+1.05%)` | :arrow_up: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `88.73% <100.00%> (+0.10%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.32% <0.00%> (-1.20%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <0.00%> (-0.57%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.23% <0.00%> (+2.38%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...44c1bd3](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-722045212


   FYI: https://github.com/apache/beam/pull/13266


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705506907






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-708515000


   > Are setup/teardown invoked for its use in CombineFnState?
   
   @robertwb Do you mean `CombiningValueRuntimeState`? If so, they're not.
   
   I can fix that, but I wonder where setup/teardown should be invoked. The setup could be invoked on `CombiningValueRuntimeState` initialization, but what's the best place for teardown? [CombiningValueRuntimeState.clear](https://github.com/apache/beam/blob/c921b0c8384809dbcab3edcd5007e92da25067c6/sdks/python/apache_beam/runners/worker/bundle_processor.py#L498) looks good, but only if it's called exactly once per object's lifetime.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r517731074



##########
File path: sdks/python/apache_beam/transforms/combiners.py
##########
@@ -700,6 +700,9 @@ def __init__(self, n):
     # helper instead.
     self._top_combiner = TopCombineFn(n)
 
+  def setup(self):
+    self._top_combiner.setup()

Review comment:
       I think this would fail the Dataflow V1 check as well.
   I thought of possible improvement of the check [`bad0a52` (#13267)](https://github.com/apache/beam/pull/13267/commits/bad0a522ced25cc6ceef3ad14563ed2f6419788f) and this call would cause an issue.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.50%   -0.01%     
   ==========================================
     Files         455      455              
     Lines       54867    54910      +43     
   ==========================================
   + Hits        45272    45304      +32     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=) | `100.00% <100.00%> (+1.85%)` | :arrow_up: |
   | [...ks/python/apache\_beam/runners/worker/operations.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9ucy5weQ==) | `70.39% <100.00%> (+1.05%)` | :arrow_up: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `88.73% <100.00%> (+0.10%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.32% <0.00%> (-1.20%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <0.00%> (-0.57%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...44c1bd3](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-722036092


   I don't have an idea how to fix this at the moment though. I think we can disable the visitor for now to keep the tests working. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-712923113


   R: @aaltay @tvalentyn @robertwb 
   
   All fixes are ready. Setup and teardown will be invoked for every `CombineFn` associated with `CombiningValueRuntimeState`. This required adding a new `finalize` method in the `RuntimeState` interface. Only `CombiningValueRuntimeState` implements this method.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-712262038


   Run Python Dataflow V2 ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r503332709



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1970,10 +1985,14 @@ def add_input_types(transform):
       return combined
 
     if self.has_defaults:
-      combine_fn = (
-          self.fn if isinstance(self.fn, CombineFn) else
-          CombineFn.from_callable(self.fn))
-      default_value = combine_fn.apply([], *self.args, **self.kwargs)
+      combine_fn = copy.copy(

Review comment:
       Better protection against potential side effects.
   
   If using default values, `CombineFn.apply` is called at pipeline construction time. `CombineFn.setup` and `CombineFn.teardown` are called along with it. The same instance of CombineFn is then serialized and sent to runner. I think it would be better to perform initial `CombineFn.apply` on a copy, so that the state of the instance is not polluted.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-721686036


   Thanks @tvalentyn. We are only waiting for an answer from @robertwb [here](https://github.com/apache/beam/pull/13048#discussion_r516397331), is it correct?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r514402275



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -877,17 +877,19 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   combining process proceeds as follows:
 
   1. Input values are partitioned into one or more batches.
-  2. For each batch, the create_accumulator method is invoked to create a fresh
+  2. For each batch, the setup method is invoked.
+  3. For each batch, the create_accumulator method is invoked to create a fresh
      initial "accumulator" value representing the combination of zero values.
-  3. For each input value in the batch, the add_input method is invoked to
+  4. For each input value in the batch, the add_input method is invoked to
      combine more values with the accumulator for that batch.
-  4. The merge_accumulators method is invoked to combine accumulators from
+  5. The merge_accumulators method is invoked to combine accumulators from
      separate batches into a single combined output accumulator value, once all
      of the accumulators have had all the input value in their batches added to
      them. This operation is invoked repeatedly, until there is only one
      accumulator value left.
-  5. The extract_output operation is invoked on the final accumulator to get
+  6. The extract_output operation is invoked on the final accumulator to get
      the output value.
+  7. The teardown method is invoked.

Review comment:
       CombineFn's teardown is similar to DoFn's teardown, which does not guarantee that the call will happen. We should expect the same from CombineFn's teardown.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705855829


   R: @tvalentyn @yifanmai 
   /cc @chuanyu 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) will **increase** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master   #13048   +/-   ##
   =======================================
     Coverage   82.48%   82.49%           
   =======================================
     Files         455      455           
     Lines       54876    54876           
   =======================================
   + Hits        45266    45270    +4     
   + Misses       9610     9606    -4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `89.20% <0.00%> (+0.44%)` | :arrow_up: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `98.24% <0.00%> (+1.75%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [15fd04d...6dd53ad](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.01%`.
   > The diff coverage is `51.72%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.49%   -0.02%     
   ==========================================
     Files         455      455              
     Lines       54867    54876       +9     
   ==========================================
   - Hits        45272    45270       -2     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `89.80% <ø> (+0.35%)` | :arrow_up: |
   | [...beam/runners/portability/local\_job\_service\_main.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9sb2NhbF9qb2Jfc2VydmljZV9tYWluLnB5) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <53.33%> (-0.57%)` | :arrow_down: |
   | [...runners/interactive/display/pcoll\_visualization.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9kaXNwbGF5L3Bjb2xsX3Zpc3VhbGl6YXRpb24ucHk=) | `85.34% <77.77%> (+0.07%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.34% <0.00%> (-0.14%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.63% <0.00%> (+0.15%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...7ac6339](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) will **increase** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master   #13048   +/-   ##
   =======================================
     Coverage   82.48%   82.49%           
   =======================================
     Files         455      455           
     Lines       54876    54876           
   =======================================
   + Hits        45266    45270    +4     
   + Misses       9610     9606    -4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `89.20% <0.00%> (+0.44%)` | :arrow_up: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `98.24% <0.00%> (+1.75%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [ce190e1...31e5bc3](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r517536798



##########
File path: sdks/python/apache_beam/transforms/userstate.py
##########
@@ -357,6 +357,10 @@ def prefetch(self):
     # The default implementation here does nothing.
     pass
 
+  def finalize(self):

Review comment:
       This is fine .




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.50%   -0.01%     
   ==========================================
     Files         455      455              
     Lines       54867    54910      +43     
   ==========================================
   + Hits        45272    45304      +32     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=) | `100.00% <100.00%> (+1.85%)` | :arrow_up: |
   | [...ks/python/apache\_beam/runners/worker/operations.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9ucy5weQ==) | `70.39% <100.00%> (+1.05%)` | :arrow_up: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `88.73% <100.00%> (+0.10%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.32% <0.00%> (-1.20%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <0.00%> (-0.57%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...44c1bd3](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r503585696



##########
File path: sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py
##########
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""ValidatesRunner tests for CombineFn lifecycle and bundle methods."""
+
+# pytype: skip-file
+
+import unittest
+from weakref import WeakSet
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.runners.direct import direct_runner
+from apache_beam.runners.portability import fn_api_runner
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+
+
+class CallSequenceEnforcingCombineFn(beam.CombineFn):
+  instances = WeakSet()
+
+  def __init__(self):
+    super(CallSequenceEnforcingCombineFn, self).__init__()
+    self._setup_called = False
+    self._accumulators_created = 0
+    self._teardown_called = False
+
+  def setup(self):
+    assert not self._setup_called, 'setup should not be called twice'
+    assert not self._teardown_called, 'setup should be called before teardown'
+    # Keep track of instances so that we can check if teardown is called
+    # properly after pipeline execution.
+    self.instances.add(self)
+    self._setup_called = True
+
+  def create_accumulator(self):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    self._accumulators_created += 1
+    return 0
+
+  def add_input(self, mutable_accumulator, element):
+    assert self._setup_called, 'setup should have been called'
+    assert self._accumulators_created > 0, \
+        'create_accumulator should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    mutable_accumulator += element
+    return mutable_accumulator
+
+  def add_inputs(self, mutable_accumulator, elements):
+    return self.add_input(mutable_accumulator, sum(elements))
+
+  def merge_accumulators(self, accumulators):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return sum(accumulators)
+
+  def extract_output(self, accumulator):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return accumulator
+
+  def teardown(self):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not be called twice'
+    self._teardown_called = True
+
+
+class BaseCombineFnLifecycleTest(unittest.TestCase):
+  def start(self, pipeline, lift_combiners=True):
+    with pipeline as p:
+      pcoll = p | 'Start' >> beam.Create(range(5))
+
+      # Certain triggers, such as AfterCount, are incompatible with combiner
+      # lifting. We can use that fact to prevent combiners from being lifted.
+      if not lift_combiners:
+        pcoll |= beam.WindowInto(
+            window.GlobalWindows(),
+            trigger=trigger.AfterCount(5),
+            accumulation_mode=trigger.AccumulationMode.DISCARDING)
+
+      pcoll |= 'Do' >> beam.CombineGlobally(CallSequenceEnforcingCombineFn())
+      assert_that(pcoll, equal_to([10]))
+
+    # Ensure that _teardown_called equals True for all CombineFns.
+    for instance in CallSequenceEnforcingCombineFn.instances:
+      self.assertTrue(instance._teardown_called)
+
+
+@attr('ValidatesRunner')
+class CombineFnLifecycleTest(BaseCombineFnLifecycleTest):
+  def setUp(self):
+    self.pipeline = TestPipeline(is_integration_test=True)
+    options = self.pipeline.get_pipeline_options()
+    standard_options = options.view_as(StandardOptions)
+    experiments = options.view_as(DebugOptions).experiments or []
+
+    if 'DataflowRunner' in standard_options.runner and \
+       not standard_options.streaming and \
+       'beam_fn_api' not in experiments and 'use_runner_v2' not in experiments:
+      self.skipTest(
+          'Non-portable Dataflow batch worker does not support '

Review comment:
       On BEAM-3736 the recommendation was to reject the job. It is more problematic with Java SDK. AFAIK with Python SDK, Dataflow is the only non-portable runner (not including direct runner).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r503029540



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -875,18 +875,20 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   input argument, which is an instance of CombineFnProcessContext). The
   combining process proceeds as follows:
 
-  1. Input values are partitioned into one or more batches.
-  2. For each batch, the create_accumulator method is invoked to create a fresh
+  1. The setup method is invoked.

Review comment:
       This alludes that  setup method is called once per entire collection. Wouldn't it be called per batch? Aggregation may happen on multiple workers, and I imagine that in such case each worker will call setup/teardown methods. Should we switch step 1 and 2?

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -895,6 +897,15 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   def default_label(self):
     return self.__class__.__name__
 
+  def setup(self):
+    """Called to prepare an instance for combining.
+
+    This method can be useful if there is some state that needs to be loaded
+    before executing any of the other methods. The resources can then be
+    disposed in ``CombineFn.teardown``.

Review comment:
       nit: s/disposed/disposed of

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1970,10 +1985,14 @@ def add_input_types(transform):
       return combined
 
     if self.has_defaults:
-      combine_fn = (
-          self.fn if isinstance(self.fn, CombineFn) else
-          CombineFn.from_callable(self.fn))
-      default_value = combine_fn.apply([], *self.args, **self.kwargs)
+      combine_fn = copy.copy(

Review comment:
       What is the reason for a shallow copy here?

##########
File path: sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py
##########
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""ValidatesRunner tests for CombineFn lifecycle and bundle methods."""
+
+# pytype: skip-file
+
+import unittest
+from weakref import WeakSet
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.runners.direct import direct_runner
+from apache_beam.runners.portability import fn_api_runner
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+
+
+class CallSequenceEnforcingCombineFn(beam.CombineFn):
+  instances = WeakSet()
+
+  def __init__(self):
+    super(CallSequenceEnforcingCombineFn, self).__init__()
+    self._setup_called = False
+    self._accumulators_created = 0
+    self._teardown_called = False
+
+  def setup(self):
+    assert not self._setup_called, 'setup should not be called twice'
+    assert not self._teardown_called, 'setup should be called before teardown'
+    # Keep track of instances so that we can check if teardown is called
+    # properly after pipeline execution.
+    self.instances.add(self)
+    self._setup_called = True
+
+  def create_accumulator(self):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    self._accumulators_created += 1
+    return 0
+
+  def add_input(self, mutable_accumulator, element):
+    assert self._setup_called, 'setup should have been called'
+    assert self._accumulators_created > 0, \
+        'create_accumulator should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    mutable_accumulator += element
+    return mutable_accumulator
+
+  def add_inputs(self, mutable_accumulator, elements):
+    return self.add_input(mutable_accumulator, sum(elements))
+
+  def merge_accumulators(self, accumulators):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return sum(accumulators)
+
+  def extract_output(self, accumulator):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return accumulator
+
+  def teardown(self):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not be called twice'
+    self._teardown_called = True
+
+
+class BaseCombineFnLifecycleTest(unittest.TestCase):
+  def start(self, pipeline, lift_combiners=True):
+    with pipeline as p:
+      pcoll = p | 'Start' >> beam.Create(range(5))
+
+      # Certain triggers, such as AfterCount, are incompatible with combiner
+      # lifting. We can use that fact to prevent combiners from being lifted.
+      if not lift_combiners:
+        pcoll |= beam.WindowInto(
+            window.GlobalWindows(),
+            trigger=trigger.AfterCount(5),
+            accumulation_mode=trigger.AccumulationMode.DISCARDING)
+
+      pcoll |= 'Do' >> beam.CombineGlobally(CallSequenceEnforcingCombineFn())
+      assert_that(pcoll, equal_to([10]))
+
+    # Ensure that _teardown_called equals True for all CombineFns.
+    for instance in CallSequenceEnforcingCombineFn.instances:
+      self.assertTrue(instance._teardown_called)
+
+
+@attr('ValidatesRunner')
+class CombineFnLifecycleTest(BaseCombineFnLifecycleTest):
+  def setUp(self):
+    self.pipeline = TestPipeline(is_integration_test=True)
+    options = self.pipeline.get_pipeline_options()
+    standard_options = options.view_as(StandardOptions)
+    experiments = options.view_as(DebugOptions).experiments or []
+
+    if 'DataflowRunner' in standard_options.runner and \
+       not standard_options.streaming and \
+       'beam_fn_api' not in experiments and 'use_runner_v2' not in experiments:
+      self.skipTest(
+          'Non-portable Dataflow batch worker does not support '

Review comment:
       Should we make  non-portable Dataflow runner detect usage of combiner initialization and alert the user that this functionality is unsupported?
   cc: @robertwb   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu removed a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu removed a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705507128






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.01%`.
   > The diff coverage is `51.72%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.49%   -0.02%     
   ==========================================
     Files         455      455              
     Lines       54867    54876       +9     
   ==========================================
   - Hits        45272    45270       -2     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `89.80% <ø> (+0.35%)` | :arrow_up: |
   | [...beam/runners/portability/local\_job\_service\_main.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9sb2NhbF9qb2Jfc2VydmljZV9tYWluLnB5) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <53.33%> (-0.57%)` | :arrow_down: |
   | [...runners/interactive/display/pcoll\_visualization.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9kaXNwbGF5L3Bjb2xsX3Zpc3VhbGl6YXRpb24ucHk=) | `85.34% <77.77%> (+0.07%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.34% <0.00%> (-0.14%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.63% <0.00%> (+0.15%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...a0336c2](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
aaltay commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705855829


   R: @tvalentyn @yifanmai 
   /cc @chuanyu 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705507128


   Run Python Dataflow ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r517731074



##########
File path: sdks/python/apache_beam/transforms/combiners.py
##########
@@ -700,6 +700,9 @@ def __init__(self, n):
     # helper instead.
     self._top_combiner = TopCombineFn(n)
 
+  def setup(self):
+    self._top_combiner.setup()

Review comment:
       I think this would fail the Dataflow V1 check as well.
   I thought of possible improvement of the check and this call would cause an issue.
   
   [`bad0a52` (#13267)](https://github.com/apache/beam/pull/13267/commits/bad0a522ced25cc6ceef3ad14563ed2f6419788f)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-722039186


   > I don't have an idea how to fix this at the moment though. I think we can disable the visitor for now to keep the tests working.
   
   Ok. We can do that and mention in changelogs and in the setup/teardown docstring that this feature is not supported on Dataflow without runner V2.
   I am also taking a look for what it takes to add support for this in Dataflow runner v1.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r503585072



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1970,10 +1985,14 @@ def add_input_types(transform):
       return combined
 
     if self.has_defaults:
-      combine_fn = (
-          self.fn if isinstance(self.fn, CombineFn) else
-          CombineFn.from_callable(self.fn))
-      default_value = combine_fn.apply([], *self.args, **self.kwargs)
+      combine_fn = copy.copy(

Review comment:
       I see. I wonder if calling setup/teardown on during pipeline submission may be undesireable in some cases.
   One option to offer flexibility is to use introduce default_value() method in CombineFn, and move the setup/teardown call in that method, trying this out in: https://github.com/apache/beam/pull/13081.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r503915735



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1970,10 +1985,14 @@ def add_input_types(transform):
       return combined
 
     if self.has_defaults:
-      combine_fn = (
-          self.fn if isinstance(self.fn, CombineFn) else
-          CombineFn.from_callable(self.fn))
-      default_value = combine_fn.apply([], *self.args, **self.kwargs)
+      combine_fn = copy.copy(

Review comment:
       Left some comments over there




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r503350536



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -875,18 +875,20 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   input argument, which is an instance of CombineFnProcessContext). The
   combining process proceeds as follows:
 
-  1. Input values are partitioned into one or more batches.
-  2. For each batch, the create_accumulator method is invoked to create a fresh
+  1. The setup method is invoked.

Review comment:
       It probably should be like this:
   
   ```
   1. Input values are partitioned into one or more batches.
   2. For each batch, the setup method is invoked.
   3. For each batch, the create_accumulator method is invoked...
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r503336032



##########
File path: sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py
##########
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""ValidatesRunner tests for CombineFn lifecycle and bundle methods."""
+
+# pytype: skip-file
+
+import unittest
+from weakref import WeakSet
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.runners.direct import direct_runner
+from apache_beam.runners.portability import fn_api_runner
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+
+
+class CallSequenceEnforcingCombineFn(beam.CombineFn):
+  instances = WeakSet()
+
+  def __init__(self):
+    super(CallSequenceEnforcingCombineFn, self).__init__()
+    self._setup_called = False
+    self._accumulators_created = 0
+    self._teardown_called = False
+
+  def setup(self):
+    assert not self._setup_called, 'setup should not be called twice'
+    assert not self._teardown_called, 'setup should be called before teardown'
+    # Keep track of instances so that we can check if teardown is called
+    # properly after pipeline execution.
+    self.instances.add(self)
+    self._setup_called = True
+
+  def create_accumulator(self):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    self._accumulators_created += 1
+    return 0
+
+  def add_input(self, mutable_accumulator, element):
+    assert self._setup_called, 'setup should have been called'
+    assert self._accumulators_created > 0, \
+        'create_accumulator should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    mutable_accumulator += element
+    return mutable_accumulator
+
+  def add_inputs(self, mutable_accumulator, elements):
+    return self.add_input(mutable_accumulator, sum(elements))
+
+  def merge_accumulators(self, accumulators):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return sum(accumulators)
+
+  def extract_output(self, accumulator):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return accumulator
+
+  def teardown(self):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not be called twice'
+    self._teardown_called = True
+
+
+class BaseCombineFnLifecycleTest(unittest.TestCase):
+  def start(self, pipeline, lift_combiners=True):
+    with pipeline as p:
+      pcoll = p | 'Start' >> beam.Create(range(5))
+
+      # Certain triggers, such as AfterCount, are incompatible with combiner
+      # lifting. We can use that fact to prevent combiners from being lifted.
+      if not lift_combiners:
+        pcoll |= beam.WindowInto(
+            window.GlobalWindows(),
+            trigger=trigger.AfterCount(5),
+            accumulation_mode=trigger.AccumulationMode.DISCARDING)
+
+      pcoll |= 'Do' >> beam.CombineGlobally(CallSequenceEnforcingCombineFn())
+      assert_that(pcoll, equal_to([10]))
+
+    # Ensure that _teardown_called equals True for all CombineFns.
+    for instance in CallSequenceEnforcingCombineFn.instances:
+      self.assertTrue(instance._teardown_called)
+
+
+@attr('ValidatesRunner')
+class CombineFnLifecycleTest(BaseCombineFnLifecycleTest):
+  def setUp(self):
+    self.pipeline = TestPipeline(is_integration_test=True)
+    options = self.pipeline.get_pipeline_options()
+    standard_options = options.view_as(StandardOptions)
+    experiments = options.view_as(DebugOptions).experiments or []
+
+    if 'DataflowRunner' in standard_options.runner and \
+       not standard_options.streaming and \
+       'beam_fn_api' not in experiments and 'use_runner_v2' not in experiments:
+      self.skipTest(
+          'Non-portable Dataflow batch worker does not support '

Review comment:
       Yes, it's probably a good idea. 
   Should we raise an exception and exit the program abnormally if user-provided setup and teardown are detected, or just inform the user that those methods won't be called?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-716887890


   Thanks, @kamilwu . I'll try to get to this and related changes this week, sorry for the delay.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) will **increase** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master   #13048   +/-   ##
   =======================================
     Coverage   82.48%   82.49%           
   =======================================
     Files         455      455           
     Lines       54876    54876           
   =======================================
   + Hits        45266    45270    +4     
   + Misses       9610     9606    -4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `89.20% <0.00%> (+0.44%)` | :arrow_up: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `98.24% <0.00%> (+1.75%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [64ec3da...e811326](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.50%   -0.01%     
   ==========================================
     Files         455      455              
     Lines       54867    54910      +43     
   ==========================================
   + Hits        45272    45304      +32     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=) | `98.38% <100.00%> (+0.23%)` | :arrow_up: |
   | [...ks/python/apache\_beam/runners/worker/operations.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9ucy5weQ==) | `70.39% <100.00%> (+1.05%)` | :arrow_up: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `88.73% <100.00%> (+0.10%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.32% <0.00%> (-1.20%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <0.00%> (-0.57%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.23% <0.00%> (+2.38%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...a0d436a](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.01%`.
   > The diff coverage is `51.72%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.49%   -0.02%     
   ==========================================
     Files         455      455              
     Lines       54867    54876       +9     
   ==========================================
   - Hits        45272    45270       -2     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `89.80% <ø> (+0.35%)` | :arrow_up: |
   | [...beam/runners/portability/local\_job\_service\_main.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9sb2NhbF9qb2Jfc2VydmljZV9tYWluLnB5) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <53.33%> (-0.57%)` | :arrow_down: |
   | [...runners/interactive/display/pcoll\_visualization.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9kaXNwbGF5L3Bjb2xsX3Zpc3VhbGl6YXRpb24ucHk=) | `85.34% <77.77%> (+0.07%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.34% <0.00%> (-0.14%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.63% <0.00%> (+0.15%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...44c1bd3](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.01%`.
   > The diff coverage is `51.72%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.49%   -0.02%     
   ==========================================
     Files         455      455              
     Lines       54867    54876       +9     
   ==========================================
   - Hits        45272    45270       -2     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `89.80% <ø> (+0.35%)` | :arrow_up: |
   | [...beam/runners/portability/local\_job\_service\_main.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9sb2NhbF9qb2Jfc2VydmljZV9tYWluLnB5) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <53.33%> (-0.57%)` | :arrow_down: |
   | [...runners/interactive/display/pcoll\_visualization.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9kaXNwbGF5L3Bjb2xsX3Zpc3VhbGl6YXRpb24ucHk=) | `85.34% <77.77%> (+0.07%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.34% <0.00%> (-0.14%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.63% <0.00%> (+0.15%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...b96bb6f](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-721081207






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-712848352


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705510122






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705506907


   Run Python Dataflow V2 ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu merged pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu merged pull request #13048:
URL: https://github.com/apache/beam/pull/13048


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r503585696



##########
File path: sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py
##########
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""ValidatesRunner tests for CombineFn lifecycle and bundle methods."""
+
+# pytype: skip-file
+
+import unittest
+from weakref import WeakSet
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.runners.direct import direct_runner
+from apache_beam.runners.portability import fn_api_runner
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+
+
+class CallSequenceEnforcingCombineFn(beam.CombineFn):
+  instances = WeakSet()
+
+  def __init__(self):
+    super(CallSequenceEnforcingCombineFn, self).__init__()
+    self._setup_called = False
+    self._accumulators_created = 0
+    self._teardown_called = False
+
+  def setup(self):
+    assert not self._setup_called, 'setup should not be called twice'
+    assert not self._teardown_called, 'setup should be called before teardown'
+    # Keep track of instances so that we can check if teardown is called
+    # properly after pipeline execution.
+    self.instances.add(self)
+    self._setup_called = True
+
+  def create_accumulator(self):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    self._accumulators_created += 1
+    return 0
+
+  def add_input(self, mutable_accumulator, element):
+    assert self._setup_called, 'setup should have been called'
+    assert self._accumulators_created > 0, \
+        'create_accumulator should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    mutable_accumulator += element
+    return mutable_accumulator
+
+  def add_inputs(self, mutable_accumulator, elements):
+    return self.add_input(mutable_accumulator, sum(elements))
+
+  def merge_accumulators(self, accumulators):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return sum(accumulators)
+
+  def extract_output(self, accumulator):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return accumulator
+
+  def teardown(self):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not be called twice'
+    self._teardown_called = True
+
+
+class BaseCombineFnLifecycleTest(unittest.TestCase):
+  def start(self, pipeline, lift_combiners=True):
+    with pipeline as p:
+      pcoll = p | 'Start' >> beam.Create(range(5))
+
+      # Certain triggers, such as AfterCount, are incompatible with combiner
+      # lifting. We can use that fact to prevent combiners from being lifted.
+      if not lift_combiners:
+        pcoll |= beam.WindowInto(
+            window.GlobalWindows(),
+            trigger=trigger.AfterCount(5),
+            accumulation_mode=trigger.AccumulationMode.DISCARDING)
+
+      pcoll |= 'Do' >> beam.CombineGlobally(CallSequenceEnforcingCombineFn())
+      assert_that(pcoll, equal_to([10]))
+
+    # Ensure that _teardown_called equals True for all CombineFns.
+    for instance in CallSequenceEnforcingCombineFn.instances:
+      self.assertTrue(instance._teardown_called)
+
+
+@attr('ValidatesRunner')
+class CombineFnLifecycleTest(BaseCombineFnLifecycleTest):
+  def setUp(self):
+    self.pipeline = TestPipeline(is_integration_test=True)
+    options = self.pipeline.get_pipeline_options()
+    standard_options = options.view_as(StandardOptions)
+    experiments = options.view_as(DebugOptions).experiments or []
+
+    if 'DataflowRunner' in standard_options.runner and \
+       not standard_options.streaming and \
+       'beam_fn_api' not in experiments and 'use_runner_v2' not in experiments:
+      self.skipTest(
+          'Non-portable Dataflow batch worker does not support '

Review comment:
       On BEAM-3736 the recommendation was to reject the job. It is more problematic with Java SDK, since there are a lot of runner. AFAIK with Python SDK, Dataflow is the only non-portable runner (not including direct runner).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-722032148


   > I wonder if this broke ValidatesRunner tests on RunnerV1
   
   Probably yes. [This](https://github.com/apache/beam/blob/c3407f379599d50fa4b4e3ae6bffc132080e535a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L437) condition isn't working as expected - it generates false positives (it wrongly assumes that setup/teardown were overridden). Hence the exception.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r503914424



##########
File path: sdks/python/apache_beam/transforms/combinefn_lifecycle_test.py
##########
@@ -0,0 +1,147 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""ValidatesRunner tests for CombineFn lifecycle and bundle methods."""
+
+# pytype: skip-file
+
+import unittest
+from weakref import WeakSet
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.runners.direct import direct_runner
+from apache_beam.runners.portability import fn_api_runner
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms import trigger
+from apache_beam.transforms import window
+
+
+class CallSequenceEnforcingCombineFn(beam.CombineFn):
+  instances = WeakSet()
+
+  def __init__(self):
+    super(CallSequenceEnforcingCombineFn, self).__init__()
+    self._setup_called = False
+    self._accumulators_created = 0
+    self._teardown_called = False
+
+  def setup(self):
+    assert not self._setup_called, 'setup should not be called twice'
+    assert not self._teardown_called, 'setup should be called before teardown'
+    # Keep track of instances so that we can check if teardown is called
+    # properly after pipeline execution.
+    self.instances.add(self)
+    self._setup_called = True
+
+  def create_accumulator(self):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    self._accumulators_created += 1
+    return 0
+
+  def add_input(self, mutable_accumulator, element):
+    assert self._setup_called, 'setup should have been called'
+    assert self._accumulators_created > 0, \
+        'create_accumulator should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    mutable_accumulator += element
+    return mutable_accumulator
+
+  def add_inputs(self, mutable_accumulator, elements):
+    return self.add_input(mutable_accumulator, sum(elements))
+
+  def merge_accumulators(self, accumulators):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return sum(accumulators)
+
+  def extract_output(self, accumulator):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return accumulator
+
+  def teardown(self):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not be called twice'
+    self._teardown_called = True
+
+
+class BaseCombineFnLifecycleTest(unittest.TestCase):
+  def start(self, pipeline, lift_combiners=True):
+    with pipeline as p:
+      pcoll = p | 'Start' >> beam.Create(range(5))
+
+      # Certain triggers, such as AfterCount, are incompatible with combiner
+      # lifting. We can use that fact to prevent combiners from being lifted.
+      if not lift_combiners:
+        pcoll |= beam.WindowInto(
+            window.GlobalWindows(),
+            trigger=trigger.AfterCount(5),
+            accumulation_mode=trigger.AccumulationMode.DISCARDING)
+
+      pcoll |= 'Do' >> beam.CombineGlobally(CallSequenceEnforcingCombineFn())
+      assert_that(pcoll, equal_to([10]))
+
+    # Ensure that _teardown_called equals True for all CombineFns.
+    for instance in CallSequenceEnforcingCombineFn.instances:
+      self.assertTrue(instance._teardown_called)
+
+
+@attr('ValidatesRunner')
+class CombineFnLifecycleTest(BaseCombineFnLifecycleTest):
+  def setUp(self):
+    self.pipeline = TestPipeline(is_integration_test=True)
+    options = self.pipeline.get_pipeline_options()
+    standard_options = options.view_as(StandardOptions)
+    experiments = options.view_as(DebugOptions).experiments or []
+
+    if 'DataflowRunner' in standard_options.runner and \
+       not standard_options.streaming and \
+       'beam_fn_api' not in experiments and 'use_runner_v2' not in experiments:
+      self.skipTest(
+          'Non-portable Dataflow batch worker does not support '

Review comment:
       I've added a code that rejects the job and provides clear information why it's rejected. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.01%`.
   > The diff coverage is `51.72%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.49%   -0.02%     
   ==========================================
     Files         455      455              
     Lines       54867    54876       +9     
   ==========================================
   - Hits        45272    45270       -2     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `89.80% <ø> (+0.35%)` | :arrow_up: |
   | [...beam/runners/portability/local\_job\_service\_main.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9sb2NhbF9qb2Jfc2VydmljZV9tYWluLnB5) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <53.33%> (-0.57%)` | :arrow_down: |
   | [...runners/interactive/display/pcoll\_visualization.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9kaXNwbGF5L3Bjb2xsX3Zpc3VhbGl6YXRpb24ucHk=) | `85.34% <77.77%> (+0.07%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.34% <0.00%> (-0.14%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.63% <0.00%> (+0.15%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...44c1bd3](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) will **increase** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master   #13048   +/-   ##
   =======================================
     Coverage   82.48%   82.49%           
   =======================================
     Files         455      455           
     Lines       54876    54876           
   =======================================
   + Hits        45266    45270    +4     
   + Misses       9610     9606    -4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `89.20% <0.00%> (+0.44%)` | :arrow_up: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `98.24% <0.00%> (+1.75%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [15fd04d...ab3572d](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.50%   -0.01%     
   ==========================================
     Files         455      455              
     Lines       54867    54910      +43     
   ==========================================
   + Hits        45272    45304      +32     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=) | `98.38% <100.00%> (+0.23%)` | :arrow_up: |
   | [...ks/python/apache\_beam/runners/worker/operations.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9ucy5weQ==) | `70.39% <100.00%> (+1.05%)` | :arrow_up: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `88.73% <100.00%> (+0.10%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.32% <0.00%> (-1.20%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <0.00%> (-0.57%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.23% <0.00%> (+2.38%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...a0d436a](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-722032127


   Looks like this test is not marked as a ValidatesRunner test in Beam. Adding the annotation in https://github.com/apache/beam/pull/13265.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-718875468


   Run Python Dataflow V2 ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.50%   -0.01%     
   ==========================================
     Files         455      455              
     Lines       54867    54910      +43     
   ==========================================
   + Hits        45272    45304      +32     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=) | `98.38% <100.00%> (+0.23%)` | :arrow_up: |
   | [...ks/python/apache\_beam/runners/worker/operations.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9ucy5weQ==) | `70.39% <100.00%> (+1.05%)` | :arrow_up: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `88.73% <100.00%> (+0.10%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.32% <0.00%> (-1.20%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <0.00%> (-0.57%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.23% <0.00%> (+2.38%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...a0d436a](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] tvalentyn commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
tvalentyn commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r516397331



##########
File path: sdks/python/apache_beam/transforms/userstate.py
##########
@@ -357,6 +357,10 @@ def prefetch(self):
     # The default implementation here does nothing.
     pass
 
+  def finalize(self):

Review comment:
       @robertwb do you see any concerns with adding a top level `finalize` method here? 

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -411,6 +411,33 @@ def visit_transform(self, transform_node):
 
     return FlattenInputVisitor()
 
+  @staticmethod
+  def combinefn_visitor():
+    # Imported here to avoid circular dependencies.
+    from apache_beam.pipeline import PipelineVisitor
+    from apache_beam import core
+
+    class CombineFnVisitor(PipelineVisitor):
+      """Checks if `CombineFn` has non-default setup or teardown methods.
+      If yes, raises `ValueError`.
+      """
+      def visit_transform(self, applied_transform):
+        transform = applied_transform.transform
+        if isinstance(transform, core.ParDo) and isinstance(
+            transform.fn, core.CombineValuesDoFn):
+          if self._overrides_setup_or_teardown(transform.fn.combinefn):
+            raise ValueError(
+                'CombineFn.setup and CombineFn.teardown are '
+                'not supported with non-portable Dataflow '
+                'runner. Please use Dataflow Runner V2 instead.')
+
+      @staticmethod
+      def _overrides_setup_or_teardown(combinefn):
+        return combinefn.__class__.setup is not core.CombineFn.setup or \

Review comment:
       nit: prefer to use `()` instead of `\` .

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1975,10 +1990,13 @@ def add_input_types(transform):
       return combined
 
     if self.has_defaults:
-      combine_fn = (
-          self.fn if isinstance(self.fn, CombineFn) else
-          CombineFn.from_callable(self.fn))
-      default_value = combine_fn.apply([], *self.args, **self.kwargs)
+      combine_fn = copy.deepcopy(self.fn) if isinstance(self.fn, CombineFn) \

Review comment:
       nit: [prefer](https://www.python.org/dev/peps/pep-0008/#:~:text=The%20preferred%20way%20of%20wrapping,a%20backslash%20for%20line%20continuation.) using () to line continuation token.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-712251940


   Run Python Dataflow V2 ValidatesRunner


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-712848474


   Run Python_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r514452511



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -877,17 +877,19 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   combining process proceeds as follows:
 
   1. Input values are partitioned into one or more batches.
-  2. For each batch, the create_accumulator method is invoked to create a fresh
+  2. For each batch, the setup method is invoked.
+  3. For each batch, the create_accumulator method is invoked to create a fresh
      initial "accumulator" value representing the combination of zero values.
-  3. For each input value in the batch, the add_input method is invoked to
+  4. For each input value in the batch, the add_input method is invoked to
      combine more values with the accumulator for that batch.
-  4. The merge_accumulators method is invoked to combine accumulators from
+  5. The merge_accumulators method is invoked to combine accumulators from
      separate batches into a single combined output accumulator value, once all
      of the accumulators have had all the input value in their batches added to
      them. This operation is invoked repeatedly, until there is only one
      accumulator value left.
-  5. The extract_output operation is invoked on the final accumulator to get
+  6. The extract_output operation is invoked on the final accumulator to get
      the output value.
+  7. The teardown method is invoked.

Review comment:
       SGTM.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] kamilwu commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
kamilwu commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r503337032



##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -895,6 +897,15 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   def default_label(self):
     return self.__class__.__name__
 
+  def setup(self):
+    """Called to prepare an instance for combining.
+
+    This method can be useful if there is some state that needs to be loaded
+    before executing any of the other methods. The resources can then be
+    disposed in ``CombineFn.teardown``.

Review comment:
       Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) will **increase** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master   #13048   +/-   ##
   =======================================
     Coverage   82.48%   82.49%           
   =======================================
     Files         455      455           
     Lines       54876    54876           
   =======================================
   + Hits        45266    45270    +4     
   + Misses       9610     9606    -4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `89.20% <0.00%> (+0.44%)` | :arrow_up: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `98.24% <0.00%> (+1.75%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [15fd04d...fc80e43](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.50%   -0.01%     
   ==========================================
     Files         455      455              
     Lines       54867    54910      +43     
   ==========================================
   + Hits        45272    45304      +32     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=) | `98.38% <100.00%> (+0.23%)` | :arrow_up: |
   | [...ks/python/apache\_beam/runners/worker/operations.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9ucy5weQ==) | `70.39% <100.00%> (+1.05%)` | :arrow_up: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `88.73% <100.00%> (+0.10%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.32% <0.00%> (-1.20%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <0.00%> (-0.57%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.23% <0.00%> (+2.38%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...a0d436a](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on a change in pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
yifanmai commented on a change in pull request #13048:
URL: https://github.com/apache/beam/pull/13048#discussion_r513623504



##########
File path: CHANGES.md
##########
@@ -62,6 +62,7 @@
 
 ## New Features / Improvements
 * Added support for avro payload format in Beam SQL Kafka Table ([BEAM-10885](https://issues.apache.org/jira/browse/BEAM-10885))
+* Added CombineFn.setup and CombineFn.teardown to Python SDK. These methods let you initialize a state before any of the other methods of the CombineFn is executed and clean that state up later on. ([BEAM-3736](https://issues.apache.org/jira/browse/BEAM-3736))

Review comment:
       nit: 'a state' -> 'the CombineFn's state'

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -1975,10 +1990,14 @@ def add_input_types(transform):
       return combined
 
     if self.has_defaults:
-      combine_fn = (
-          self.fn if isinstance(self.fn, CombineFn) else
-          CombineFn.from_callable(self.fn))
-      default_value = combine_fn.apply([], *self.args, **self.kwargs)
+      combine_fn = copy.deepcopy(
+          self.fn if isinstance(self.fn, CombineFn) else CombineFn.

Review comment:
       nit: this can be `copy.deepcopy(self.fn) if...` i.e. copy is only needed in the first branch

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -877,17 +877,19 @@ class CombineFn(WithTypeHints, HasDisplayData, urns.RunnerApiFn):
   combining process proceeds as follows:
 
   1. Input values are partitioned into one or more batches.
-  2. For each batch, the create_accumulator method is invoked to create a fresh
+  2. For each batch, the setup method is invoked.
+  3. For each batch, the create_accumulator method is invoked to create a fresh
      initial "accumulator" value representing the combination of zero values.
-  3. For each input value in the batch, the add_input method is invoked to
+  4. For each input value in the batch, the add_input method is invoked to
      combine more values with the accumulator for that batch.
-  4. The merge_accumulators method is invoked to combine accumulators from
+  5. The merge_accumulators method is invoked to combine accumulators from
      separate batches into a single combined output accumulator value, once all
      of the accumulators have had all the input value in their batches added to
      them. This operation is invoked repeatedly, until there is only one
      accumulator value left.
-  5. The extract_output operation is invoked on the final accumulator to get
+  6. The extract_output operation is invoked on the final accumulator to get
      the output value.
+  7. The teardown method is invoked.

Review comment:
       Question: What is the expected behavior if setup throws an exception? Should teardown still be called?

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -411,6 +411,33 @@ def visit_transform(self, transform_node):
 
     return FlattenInputVisitor()
 
+  @staticmethod
+  def combinefn_visitor():
+    # Imported here to avoid circular dependencies.
+    from apache_beam.pipeline import PipelineVisitor
+    from apache_beam import core
+
+    class CombineFnVisitor(PipelineVisitor):
+      """Checks if `CombineFn` has non-default setup or teardown methods.
+      If yes, raises `ValueError`.
+      """
+      def visit_transform(self, applied_transform):
+        transform = applied_transform.transform
+        if isinstance(transform, core.ParDo) and isinstance(
+            transform.fn, core.CombineValuesDoFn):
+          if self._overrides_setup_or_teardown(transform.fn.combinefn):
+            raise ValueError(
+                'CombineFn.setup and CombineFn.teardown are '
+                'not supported with non-portable Dataflow '
+                'runner. Please use Dataflow Runner V2 instead.')

Review comment:
       Question: Is there any plan to support this in non-portable Dataflow Runner, or will this be a V2 feature only?

##########
File path: sdks/python/apache_beam/transforms/combinefn_lifecycle_pipeline.py
##########
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+from typing import Set
+from typing import Tuple
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import TypeOptions
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms import combiners
+from apache_beam.transforms import trigger
+from apache_beam.transforms import userstate
+from apache_beam.transforms import window
+from apache_beam.typehints import with_input_types
+from apache_beam.typehints import with_output_types
+
+
+@with_input_types(int)
+@with_output_types(int)
+class CallSequenceEnforcingCombineFn(beam.CombineFn):
+  instances = set()  # type: Set[CallSequenceEnforcingCombineFn]
+
+  def __init__(self):
+    super(CallSequenceEnforcingCombineFn, self).__init__()
+    self._setup_called = False
+    self._teardown_called = False
+
+  def setup(self, *args, **kwargs):
+    assert not self._setup_called, 'setup should not be called twice'
+    assert not self._teardown_called, 'setup should be called before teardown'
+    # Keep track of instances so that we can check if teardown is called
+    # properly after pipeline execution.
+    self.instances.add(self)
+    self._setup_called = True
+
+  def create_accumulator(self, *args, **kwargs):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return 0
+
+  def add_input(self, mutable_accumulator, element, *args, **kwargs):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    mutable_accumulator += element
+    return mutable_accumulator
+
+  def add_inputs(self, mutable_accumulator, elements, *args, **kwargs):
+    return self.add_input(mutable_accumulator, sum(elements))
+
+  def merge_accumulators(self, accumulators, *args, **kwargs):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return sum(accumulators)
+
+  def extract_output(self, accumulator, *args, **kwargs):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not have been called'
+    return accumulator
+
+  def teardown(self, *args, **kwargs):
+    assert self._setup_called, 'setup should have been called'
+    assert not self._teardown_called, 'teardown should not be called twice'
+    self._teardown_called = True
+
+
+@with_input_types(Tuple[None, str])
+@with_output_types(Tuple[int, str])
+class IndexAssigningDoFn(beam.DoFn):
+  state_param = beam.DoFn.StateParam(
+      userstate.CombiningValueStateSpec(
+          'index', beam.coders.VarIntCoder(), CallSequenceEnforcingCombineFn()))
+
+  def process(self, element, state=state_param):
+    _, value = element
+    current_index = state.read()
+    yield current_index, value
+    state.add(1)
+
+
+def run_combine(pipeline, input_elements=5, lift_combiners=True):
+  # Calculate the excepted result, which is the sum of an arythmetic sequence.

Review comment:
       nit: 'arythmetic' -> 'arithmetic'




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.01%`.
   > The diff coverage is `51.72%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.49%   -0.02%     
   ==========================================
     Files         455      455              
     Lines       54867    54876       +9     
   ==========================================
   - Hits        45272    45269       -3     
   - Misses       9595     9607      +12     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...eam/runners/interactive/interactive\_environment.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9pbnRlcmFjdGl2ZV9lbnZpcm9ubWVudC5weQ==) | `89.80% <ø> (+0.35%)` | :arrow_up: |
   | [...beam/runners/portability/local\_job\_service\_main.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9sb2NhbF9qb2Jfc2VydmljZV9tYWluLnB5) | `0.00% <0.00%> (ø)` | |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <53.33%> (-0.57%)` | :arrow_down: |
   | [...runners/interactive/display/pcoll\_visualization.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9kaXNwbGF5L3Bjb2xsX3Zpc3VhbGl6YXRpb24ucHk=) | `85.34% <77.77%> (+0.07%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `94.21% <0.00%> (-0.27%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.63% <0.00%> (+0.15%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...44c1bd3](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] edited a comment on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] edited a comment on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/3d6cc0ed9ed537229b27b5dbe73288f21b0e351c?el=desc) will **increase** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@           Coverage Diff           @@
   ##           master   #13048   +/-   ##
   =======================================
     Coverage   82.48%   82.49%           
   =======================================
     Files         455      455           
     Lines       54876    54876           
   =======================================
   + Hits        45266    45270    +4     
   + Misses       9610     9606    -4     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `89.20% <0.00%> (+0.44%)` | :arrow_up: |
   | [.../python/apache\_beam/transforms/periodicsequence.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9wZXJpb2RpY3NlcXVlbmNlLnB5) | `98.24% <0.00%> (+1.75%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [4836937...e3326d1](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] codecov[bot] commented on pull request #13048: [BEAM-3736] Add CombineFn.setup and CombineFn.teardown to Python SDK

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on pull request #13048:
URL: https://github.com/apache/beam/pull/13048#issuecomment-705515661


   # [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=h1) Report
   > Merging [#13048](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=desc) into [master](https://codecov.io/gh/apache/beam/commit/6bf56f92b34f7c15b752c46eca19489a604c4775?el=desc) will **decrease** coverage by `0.00%`.
   > The diff coverage is `100.00%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/beam/pull/13048/graphs/tree.svg?width=650&height=150&src=pr&token=qcbbAh8Fj1)](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree)
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #13048      +/-   ##
   ==========================================
   - Coverage   82.51%   82.50%   -0.01%     
   ==========================================
     Files         455      455              
     Lines       54867    54910      +43     
   ==========================================
   + Hits        45272    45304      +32     
   - Misses       9595     9606      +11     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=tree) | Coverage Δ | |
   |---|---|---|
   | [...on/apache\_beam/runners/direct/helper\_transforms.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvaGVscGVyX3RyYW5zZm9ybXMucHk=) | `98.38% <100.00%> (+0.23%)` | :arrow_up: |
   | [...ks/python/apache\_beam/runners/worker/operations.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvb3BlcmF0aW9ucy5weQ==) | `70.39% <100.00%> (+1.05%)` | :arrow_up: |
   | [sdks/python/apache\_beam/transforms/core.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb3JlLnB5) | `88.73% <100.00%> (+0.10%)` | :arrow_up: |
   | [sdks/python/apache\_beam/io/source\_test\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vc291cmNlX3Rlc3RfdXRpbHMucHk=) | `88.28% <0.00%> (-1.36%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `88.32% <0.00%> (-1.20%)` | :arrow_down: |
   | [sdks/python/apache\_beam/io/gcp/bigquery\_tools.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vaW8vZ2NwL2JpZ3F1ZXJ5X3Rvb2xzLnB5) | `87.79% <0.00%> (-0.57%)` | :arrow_down: |
   | [...nners/portability/fn\_api\_runner/worker\_handlers.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL3dvcmtlcl9oYW5kbGVycy5weQ==) | `80.57% <0.00%> (-0.18%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/13048/diff?src=pr&el=tree#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.23% <0.00%> (+2.38%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=continue).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=footer). Last update [6bf56f9...a0d436a](https://codecov.io/gh/apache/beam/pull/13048?src=pr&el=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments).
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org