You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/07/01 05:21:56 UTC

[beam] branch master updated: [BEAM-10321] retain environments in flatten for preventing it from being fused into stages running in foreign language SDKs

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 09258f2  [BEAM-10321] retain environments in flatten for preventing it from being fused into stages running in foreign language SDKs
     new 1805c66  Merge pull request #12087 from [BEAM-10321] retain environments in flatten for preventing it from be…
09258f2 is described below

commit 09258f27b83f7034856a0231de3d9188ffa8bd6d
Author: Heejong Lee <he...@gmail.com>
AuthorDate: Wed Jun 24 18:31:41 2020 -0700

    [BEAM-10321] retain environments in flatten for preventing it from being fused into stages running in foreign language SDKs
---
 .../apache_beam/runners/portability/fn_api_runner/translations.py   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
index eca8494..34dcf48 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py
@@ -1141,7 +1141,8 @@ def sink_flattens(stages, pipeline_context):
                     inputs={local_in: pcoll_in},
                     spec=beam_runner_api_pb2.FunctionSpec(
                         urn=bundle_processor.DATA_OUTPUT_URN,
-                        payload=buffer_id))
+                        payload=buffer_id),
+                    environment_id=transform.environment_id)
             ],
             downstream_side_inputs=frozenset(),
             must_follow=stage.must_follow)
@@ -1155,7 +1156,8 @@ def sink_flattens(stages, pipeline_context):
                   unique_name=transform.unique_name + '/Read',
                   outputs=transform.outputs,
                   spec=beam_runner_api_pb2.FunctionSpec(
-                      urn=bundle_processor.DATA_INPUT_URN, payload=buffer_id))
+                      urn=bundle_processor.DATA_INPUT_URN, payload=buffer_id),
+                  environment_id=transform.environment_id)
           ],
           downstream_side_inputs=stage.downstream_side_inputs,
           must_follow=union(frozenset(flatten_writes), stage.must_follow))