You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/02/02 02:09:07 UTC

[GitHub] [beam] yifanmai opened a new pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

yifanmai opened a new pull request #13877:
URL: https://github.com/apache/beam/pull/13877


   Currently `translations.pack_combiners` produces a `CombinePerKey` with no subtransforms, but some runners do not treat `CombinePerKey` as a primitive transform and require the `GroupByKey`, `Combine` and / or `ParDo(CombineValuesDoFn)` subtransforms to be present. This changes `translations.pack_combiners` to add the required subtransforms to the `CombinePerKey`.
   
   Additionally, this reverts some changes to DataflowRunner from #13763. It removes a workaround that used `PTransformOverride` to recreate the subtransforms.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam
 .apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.a
 pache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam
 .apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Dataflow/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
yifanmai commented on pull request #13877:
URL: https://github.com/apache/beam/pull/13877#issuecomment-772096361


   OK, still running into a bunch of issues:
   
   - FnApiRunner doesn't actually use the sub-transforms in CombinePerKey. So I can't use it to test that the subtransforms in the packed CombinePerKey are correct. Is there any runner that actually uses the subtransforms that I can use for testing?
   - DataflowRunner has a [CombineValuesPTransformOverride](https://github.com/apache/beam/blob/0078bb35ba4aef9410d681d8b4e2c16d9f56433d/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py#L155) that replaces the composite `CombineValues` with a primitive `CombineValuesReplacement`. We need a _another_ code path to mimic this replacement.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #13877:
URL: https://github.com/apache/beam/pull/13877#issuecomment-772103307


   "FnApiRunner doesn't actually use the sub-transforms in CombinePerKey. So I can't use it to test that the subtransforms in the packed CombinePerKey are correct. Is there any runner that actually uses the subtransforms that I can use for testing?"
   
   Can't we just rollback to previous behavior where we were generating protos correctly (before 2.27.0) instead of trying to re-build the proto (which is brittle) ?
   
   DataflowRunner Runner v2 should use the proto when experiment "use_portable_job_submission" is set.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
robertwb commented on pull request #13877:
URL: https://github.com/apache/beam/pull/13877#issuecomment-775564361


   > OK, still running into a bunch of issues:
   > 
   > * FnApiRunner doesn't actually use the sub-transforms in CombinePerKey. So I can't use it to test that the subtransforms in the packed CombinePerKey are correct. Is there any runner that actually uses the subtransforms that I can use for testing?
   
   You could disable combiner lifting in th FnApiRunner to test things out. 
   
   > * DataflowRunner has a [CombineValuesPTransformOverride](https://github.com/apache/beam/blob/0078bb35ba4aef9410d681d8b4e2c16d9f56433d/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py#L155) that replaces the composite `CombineValues` with a primitive `CombineValuesReplacement`. We need a _another_ code path to mimic this replacement.
   
   Ugh. 
   
   Maybe we could simplify things by only supporting portable job submission? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
yifanmai commented on pull request #13877:
URL: https://github.com/apache/beam/pull/13877#issuecomment-772096361


   OK, still running into a bunch of issues:
   
   - FnApiRunner doesn't actually use the sub-transforms in CombinePerKey. So I can't use it to test that the subtransforms in the packed CombinePerKey are correct. Is there any runner that actually uses the subtransforms that I can use for testing?
   - DataflowRunner has a [CombineValuesPTransformOverride](https://github.com/apache/beam/blob/0078bb35ba4aef9410d681d8b4e2c16d9f56433d/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py#L155) that replaces the composite `CombineValues` with a primitive `CombineValuesReplacement`. We need a _another_ code path to mimic this replacement.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] github-actions[bot] closed pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #13877:
URL: https://github.com/apache/beam/pull/13877


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #13877:
URL: https://github.com/apache/beam/pull/13877#issuecomment-772056363






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] github-actions[bot] commented on pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13877:
URL: https://github.com/apache/beam/pull/13877#issuecomment-1037214479


   This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] robertwb commented on a change in pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
robertwb commented on a change in pull request #13877:
URL: https://github.com/apache/beam/pull/13877#discussion_r572469235



##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
         *[
             core.CombineFn.from_runner_api(combine_payload.combine_fn, context)  # type: ignore[arg-type]
             for combine_payload in combine_payloads
-        ]).to_runner_api(context)  # type: ignore[arg-type]
+        ])  # type: ignore[arg-type]
     pack_transform = beam_runner_api_pb2.PTransform(
-        unique_name=pack_transform_name + '/Pack',
+        unique_name=pack_transform_name + '/CombinePerKey',
         spec=beam_runner_api_pb2.FunctionSpec(
             urn=common_urns.composites.COMBINE_PER_KEY.urn,
             payload=beam_runner_api_pb2.CombinePayload(
-                combine_fn=pack_combine_fn,
+                combine_fn=pack_combine_fn.to_runner_api(context),
                 accumulator_coder_id=tuple_accumulator_coder_id).
             SerializeToString()),
         inputs={'in': input_pcoll_id},
         # 'None' single output key follows convention for CombinePerKey.
         outputs={'None': pack_pcoll_id},
         environment_id=fused_stage.environment)
     pack_stage = Stage(
-        pack_stage_name + '/Pack', [pack_transform],
+        pack_stage_name + '/CombinePerKey', [pack_transform],
         downstream_side_inputs=fused_stage.downstream_side_inputs,
         must_follow=fused_stage.must_follow,
         parent=fused_stage.parent,
         environment=fused_stage.environment)
+
+    # Traverse the subtransform structure.
+    original_group_by_key_transforms = []
+    original_combine_grouped_values_transforms = []
+    original_combine_values_par_do_transforms = []
+    for transform in transforms:
+      # CombinePerKey may contain GroupByKey and Combine subtransforms.
+      if transform.subtransforms:
+        assert len(transform.subtransforms) == 2
+
+        group_by_key_transform = context.components.transforms[
+            transform.subtransforms[0]]

Review comment:
       I don't think there's any requirement that they come in a particular order. 
   
   What we can assume, or use as a precondition to grouping in _group_stages_by_key, is that there will be two subtransforms of the expected kind. 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
         *[
             core.CombineFn.from_runner_api(combine_payload.combine_fn, context)  # type: ignore[arg-type]

Review comment:
       There is a check for combine_payload.combine_fn.urn == python_urns.PICKLED_COMBINE_FN above. 
   
   It is unfortunate, however, to have this python-specific stuff embedded into the SDK. Ideally the notion of a packed CombineFn could be in the model and one could create such from a set of CombineFns. 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
         *[
             core.CombineFn.from_runner_api(combine_payload.combine_fn, context)  # type: ignore[arg-type]
             for combine_payload in combine_payloads
-        ]).to_runner_api(context)  # type: ignore[arg-type]
+        ])  # type: ignore[arg-type]
     pack_transform = beam_runner_api_pb2.PTransform(
-        unique_name=pack_transform_name + '/Pack',
+        unique_name=pack_transform_name + '/CombinePerKey',
         spec=beam_runner_api_pb2.FunctionSpec(
             urn=common_urns.composites.COMBINE_PER_KEY.urn,
             payload=beam_runner_api_pb2.CombinePayload(
-                combine_fn=pack_combine_fn,
+                combine_fn=pack_combine_fn.to_runner_api(context),
                 accumulator_coder_id=tuple_accumulator_coder_id).
             SerializeToString()),
         inputs={'in': input_pcoll_id},
         # 'None' single output key follows convention for CombinePerKey.
         outputs={'None': pack_pcoll_id},
         environment_id=fused_stage.environment)
     pack_stage = Stage(
-        pack_stage_name + '/Pack', [pack_transform],
+        pack_stage_name + '/CombinePerKey', [pack_transform],
         downstream_side_inputs=fused_stage.downstream_side_inputs,
         must_follow=fused_stage.must_follow,
         parent=fused_stage.parent,
         environment=fused_stage.environment)
+
+    # Traverse the subtransform structure.
+    original_group_by_key_transforms = []
+    original_combine_grouped_values_transforms = []
+    original_combine_values_par_do_transforms = []
+    for transform in transforms:

Review comment:
       We would still have to insert the multiplexing DoFn as a follow-on. 
   
   I do agree this function is getting a bit unwieldy though. 
   
   Could we perhaps factor this out into a function `create_combine_per_key_transform(combine_fn, input_pcoll, output_pcoll, template)` that also creates the internal structure? 

##########
File path: sdks/python/apache_beam/transforms/core.py
##########
@@ -2171,6 +2171,7 @@ def __init__(
       runtime_type_check,  # type: bool
   ):
     super(CombineValuesDoFn, self).__init__()
+    self.input_pcoll_type = input_pcoll_type

Review comment:
       Whence this change?

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
         *[
             core.CombineFn.from_runner_api(combine_payload.combine_fn, context)  # type: ignore[arg-type]
             for combine_payload in combine_payloads
-        ]).to_runner_api(context)  # type: ignore[arg-type]
+        ])  # type: ignore[arg-type]
     pack_transform = beam_runner_api_pb2.PTransform(
-        unique_name=pack_transform_name + '/Pack',
+        unique_name=pack_transform_name + '/CombinePerKey',
         spec=beam_runner_api_pb2.FunctionSpec(
             urn=common_urns.composites.COMBINE_PER_KEY.urn,
             payload=beam_runner_api_pb2.CombinePayload(
-                combine_fn=pack_combine_fn,
+                combine_fn=pack_combine_fn.to_runner_api(context),
                 accumulator_coder_id=tuple_accumulator_coder_id).
             SerializeToString()),
         inputs={'in': input_pcoll_id},
         # 'None' single output key follows convention for CombinePerKey.
         outputs={'None': pack_pcoll_id},
         environment_id=fused_stage.environment)
     pack_stage = Stage(
-        pack_stage_name + '/Pack', [pack_transform],
+        pack_stage_name + '/CombinePerKey', [pack_transform],
         downstream_side_inputs=fused_stage.downstream_side_inputs,
         must_follow=fused_stage.must_follow,
         parent=fused_stage.parent,
         environment=fused_stage.environment)
+
+    # Traverse the subtransform structure.
+    original_group_by_key_transforms = []
+    original_combine_grouped_values_transforms = []
+    original_combine_values_par_do_transforms = []
+    for transform in transforms:
+      # CombinePerKey may contain GroupByKey and Combine subtransforms.
+      if transform.subtransforms:
+        assert len(transform.subtransforms) == 2
+
+        group_by_key_transform = context.components.transforms[
+            transform.subtransforms[0]]
+        assert group_by_key_transform.spec.urn == common_urns.primitives.GROUP_BY_KEY.urn
+        original_group_by_key_transforms.append(group_by_key_transform)
+
+        combine_grouped_values_transform = context.components.transforms[
+            transform.subtransforms[1]]
+        assert combine_grouped_values_transform.spec.urn == common_urns.combine_components.COMBINE_GROUPED_VALUES.urn
+        original_combine_grouped_values_transforms.append(
+            combine_grouped_values_transform)
+
+        # Combine may contain a ParDo subtransform.
+        if combine_grouped_values_transform.subtransforms:
+          assert len(combine_grouped_values_transform.subtransforms) == 1
+
+          combine_values_par_do_transform = context.components.transforms[
+              combine_grouped_values_transform.subtransforms[0]]
+          assert combine_values_par_do_transform.spec.urn == common_urns.primitives.PAR_DO.urn
+          original_combine_values_par_do_transforms.append(
+              combine_values_par_do_transform)
+
+    # Pack the subtransforms if and only if the original transform had subtransforms.
+    if original_group_by_key_transforms or original_combine_grouped_values_transforms:
+      assert original_group_by_key_transforms and original_combine_grouped_values_transforms
+      # For each subtransform, reuse the an arbitrary original subtransform as a template,
+      # and then rewrite it with the correct input, output and payload.
+      # Also reuse an arbitrary GroupByKey output PCollection.
+
+      grouped_pcoll_id = next(
+          iter(original_group_by_key_transforms[0].outputs.values()))
+
+      packed_group_by_key_transform_name = (
+          pack_transform_name + '/CombinePerKey/GroupByKey')
+      packed_group_by_key_transform_key = unique_name(
+          context.components.transforms, packed_group_by_key_transform_name)
+      context.components.transforms[packed_group_by_key_transform_key].CopyFrom(
+          original_group_by_key_transforms[0])
+      context.components.transforms[
+          packed_group_by_key_transform_key].unique_name = packed_group_by_key_transform_name
+      context.components.transforms[
+          packed_group_by_key_transform_key].outputs.clear()
+      context.components.transforms[packed_group_by_key_transform_key].outputs[
+          'None'] = grouped_pcoll_id
+
+      packed_combine_grouped_values_transform_name = (
+          pack_transform_name + '/CombinePerKey/Combine')
+      packed_combine_grouped_values_transform_key = unique_name(
+          context.components.transforms,
+          packed_combine_grouped_values_transform_name)
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].CopyFrom(
+              original_combine_grouped_values_transforms[0])
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].unique_name = packed_group_by_key_transform_name
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].inputs.clear()
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].inputs[
+              '0'] = grouped_pcoll_id
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].outputs.clear()
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].outputs[
+              '0'] = pack_pcoll_id
+      context.components.transforms[
+          packed_combine_grouped_values_transform_key].spec.payload = beam_runner_api_pb2.CombinePayload(
+              combine_fn=pack_combine_fn.to_runner_api(context),
+              accumulator_coder_id=tuple_accumulator_coder_id
+          ).SerializeToString()
+
+      if original_combine_values_par_do_transforms:

Review comment:
       Why this if statement? 

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations_test.py
##########
@@ -67,6 +67,7 @@ def expand(self, pcoll):
     environment = environments.DockerEnvironment.from_options(
         pipeline_options.PortableOptions(sdk_location='container'))
     pipeline_proto = pipeline.to_runner_api(default_environment=environment)
+    # logging.error('[debug:yifanmai] unoptimized pipeline %s' % pipeline_proto)

Review comment:
       Remove in final PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #13877:
URL: https://github.com/apache/beam/pull/13877#discussion_r568916188



##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -462,62 +462,6 @@ def run_pipeline(self, pipeline, options):
 
     self._maybe_add_unified_worker_missing_options(options)
 
-    from apache_beam.transforms import environments
-    if options.view_as(SetupOptions).prebuild_sdk_container_engine:

Review comment:
       So this optimization is not needed for Dataflow Runner v1 (just to confirm) ?

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -544,6 +488,21 @@ def run_pipeline(self, pipeline, options):
     if use_fnapi and not apiclient._use_unified_worker(options):
       pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
 
+    from apache_beam.transforms import environments
+    if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+      # if prebuild_sdk_container_engine is specified we will build a new sdk
+      # container image with dependencies pre-installed and use that image,
+      # instead of using the inferred default container image.
+      self._default_environment = (
+          environments.DockerEnvironment.from_options(options))
+      options.view_as(WorkerOptions).worker_harness_container_image = (
+          self._default_environment.container_image)

Review comment:
       Where is "self._default_environment.container_image" set ?

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -552,6 +511,22 @@ def run_pipeline(self, pipeline, options):
     self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
         return_context=True, default_environment=self._default_environment)
 
+    # Optimize the pipeline if it not streaming and the
+    # disable_optimize_pipeline_for_dataflow experiment has not been set.
+    if (not options.view_as(StandardOptions).streaming and
+        not options.view_as(DebugOptions).lookup_experiment(
+            "disable_optimize_pipeline_for_dataflow")):

Review comment:
       Is this a new optimization ?
   Usually, for new optimizations, the preference is to not opt-in users by default.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
         *[
             core.CombineFn.from_runner_api(combine_payload.combine_fn, context)  # type: ignore[arg-type]

Review comment:
       This "core.CombineFn.from_runner_api" call will break for x-lang Combine transforms in the proto. Can we make this optimization opt in instead of making this the default ? If this is the default it should work for all pipelines including x-lang.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
         *[
             core.CombineFn.from_runner_api(combine_payload.combine_fn, context)  # type: ignore[arg-type]
             for combine_payload in combine_payloads
-        ]).to_runner_api(context)  # type: ignore[arg-type]
+        ])  # type: ignore[arg-type]
     pack_transform = beam_runner_api_pb2.PTransform(
-        unique_name=pack_transform_name + '/Pack',
+        unique_name=pack_transform_name + '/CombinePerKey',
         spec=beam_runner_api_pb2.FunctionSpec(
             urn=common_urns.composites.COMBINE_PER_KEY.urn,
             payload=beam_runner_api_pb2.CombinePayload(
-                combine_fn=pack_combine_fn,
+                combine_fn=pack_combine_fn.to_runner_api(context),
                 accumulator_coder_id=tuple_accumulator_coder_id).
             SerializeToString()),
         inputs={'in': input_pcoll_id},
         # 'None' single output key follows convention for CombinePerKey.
         outputs={'None': pack_pcoll_id},
         environment_id=fused_stage.environment)
     pack_stage = Stage(
-        pack_stage_name + '/Pack', [pack_transform],
+        pack_stage_name + '/CombinePerKey', [pack_transform],
         downstream_side_inputs=fused_stage.downstream_side_inputs,
         must_follow=fused_stage.must_follow,
         parent=fused_stage.parent,
         environment=fused_stage.environment)
+
+    # Traverse the subtransform structure.
+    original_group_by_key_transforms = []
+    original_combine_grouped_values_transforms = []
+    original_combine_values_par_do_transforms = []
+    for transform in transforms:

Review comment:
       This whole thing looks pretty brittle. Can we not remove sub-transforms from the CombinePerKey transform in the first place (before generating the proto) so that we do not manually have to modify the proto here ?

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -544,6 +488,21 @@ def run_pipeline(self, pipeline, options):
     if use_fnapi and not apiclient._use_unified_worker(options):
       pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
 
+    from apache_beam.transforms import environments
+    if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+      # if prebuild_sdk_container_engine is specified we will build a new sdk
+      # container image with dependencies pre-installed and use that image,
+      # instead of using the inferred default container image.
+      self._default_environment = (

Review comment:
       Default environment should have the correct container URL to be used by Dataflow (same as "worker_harness_container_image")




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] yifanmai commented on pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
yifanmai commented on pull request #13877:
URL: https://github.com/apache/beam/pull/13877#issuecomment-776288678


   > Maybe we could simplify things by only supporting portable job submission?
   
   Even with portable job submission, the subtransform structure is different (CombineValues is a primitive rather than a composite). [CombineValuesPTransformOverride](https://github.com/apache/beam/blob/595c25263638b3d5624875e46b69a442cedb09a7/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py#L128) performs this substitution in DataflowRunner [before](https://github.com/apache/beam/blob/595c25263638b3d5624875e46b69a442cedb09a7/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py#L479) the pipeline is converted to a proto. So the optimizer will still see the Dataflow-specific subtransform structure.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on pull request #13877:
URL: https://github.com/apache/beam/pull/13877#issuecomment-772056363


   cc: @robertwb 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] github-actions[bot] commented on pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13877:
URL: https://github.com/apache/beam/pull/13877#issuecomment-1037214479


   This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] chamikaramj commented on a change in pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on a change in pull request #13877:
URL: https://github.com/apache/beam/pull/13877#discussion_r568916188



##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -462,62 +462,6 @@ def run_pipeline(self, pipeline, options):
 
     self._maybe_add_unified_worker_missing_options(options)
 
-    from apache_beam.transforms import environments
-    if options.view_as(SetupOptions).prebuild_sdk_container_engine:

Review comment:
       So this optimization is not needed for Dataflow Runner v1 (just to confirm) ?

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -544,6 +488,21 @@ def run_pipeline(self, pipeline, options):
     if use_fnapi and not apiclient._use_unified_worker(options):
       pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
 
+    from apache_beam.transforms import environments
+    if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+      # if prebuild_sdk_container_engine is specified we will build a new sdk
+      # container image with dependencies pre-installed and use that image,
+      # instead of using the inferred default container image.
+      self._default_environment = (
+          environments.DockerEnvironment.from_options(options))
+      options.view_as(WorkerOptions).worker_harness_container_image = (
+          self._default_environment.container_image)

Review comment:
       Where is "self._default_environment.container_image" set ?

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -552,6 +511,22 @@ def run_pipeline(self, pipeline, options):
     self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
         return_context=True, default_environment=self._default_environment)
 
+    # Optimize the pipeline if it not streaming and the
+    # disable_optimize_pipeline_for_dataflow experiment has not been set.
+    if (not options.view_as(StandardOptions).streaming and
+        not options.view_as(DebugOptions).lookup_experiment(
+            "disable_optimize_pipeline_for_dataflow")):

Review comment:
       Is this a new optimization ?
   Usually, for new optimizations, the preference is to not opt-in users by default.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
         *[
             core.CombineFn.from_runner_api(combine_payload.combine_fn, context)  # type: ignore[arg-type]

Review comment:
       This "core.CombineFn.from_runner_api" call will break for x-lang Combine transforms in the proto. Can we make this optimization opt in instead of making this the default ? If this is the default it should work for all pipelines including x-lang.

##########
File path: sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
##########
@@ -1000,25 +1000,151 @@ def make_pack_name(names):
         *[
             core.CombineFn.from_runner_api(combine_payload.combine_fn, context)  # type: ignore[arg-type]
             for combine_payload in combine_payloads
-        ]).to_runner_api(context)  # type: ignore[arg-type]
+        ])  # type: ignore[arg-type]
     pack_transform = beam_runner_api_pb2.PTransform(
-        unique_name=pack_transform_name + '/Pack',
+        unique_name=pack_transform_name + '/CombinePerKey',
         spec=beam_runner_api_pb2.FunctionSpec(
             urn=common_urns.composites.COMBINE_PER_KEY.urn,
             payload=beam_runner_api_pb2.CombinePayload(
-                combine_fn=pack_combine_fn,
+                combine_fn=pack_combine_fn.to_runner_api(context),
                 accumulator_coder_id=tuple_accumulator_coder_id).
             SerializeToString()),
         inputs={'in': input_pcoll_id},
         # 'None' single output key follows convention for CombinePerKey.
         outputs={'None': pack_pcoll_id},
         environment_id=fused_stage.environment)
     pack_stage = Stage(
-        pack_stage_name + '/Pack', [pack_transform],
+        pack_stage_name + '/CombinePerKey', [pack_transform],
         downstream_side_inputs=fused_stage.downstream_side_inputs,
         must_follow=fused_stage.must_follow,
         parent=fused_stage.parent,
         environment=fused_stage.environment)
+
+    # Traverse the subtransform structure.
+    original_group_by_key_transforms = []
+    original_combine_grouped_values_transforms = []
+    original_combine_values_par_do_transforms = []
+    for transform in transforms:

Review comment:
       This whole thing looks pretty brittle. Can we not remove sub-transforms from the CombinePerKey transform in the first place (before generating the proto) so that we do not manually have to modify the proto here ?

##########
File path: sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
##########
@@ -544,6 +488,21 @@ def run_pipeline(self, pipeline, options):
     if use_fnapi and not apiclient._use_unified_worker(options):
       pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
 
+    from apache_beam.transforms import environments
+    if options.view_as(SetupOptions).prebuild_sdk_container_engine:
+      # if prebuild_sdk_container_engine is specified we will build a new sdk
+      # container image with dependencies pre-installed and use that image,
+      # instead of using the inferred default container image.
+      self._default_environment = (

Review comment:
       Default environment should have the correct container URL to be used by Dataflow (same as "worker_harness_container_image")




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] github-actions[bot] commented on pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #13877:
URL: https://github.com/apache/beam/pull/13877#issuecomment-1030616610


   This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the dev@beam.apache.org list. Thank you for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] github-actions[bot] closed pull request #13877: [BEAM-11715] Recreate subtransforms for packed combiners

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #13877:
URL: https://github.com/apache/beam/pull/13877


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org