You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/05/04 17:53:43 UTC
[beam] branch release-2.21.0 updated: [BEAM-9870] Always generate
Dataflow-compatible FnApi protos. (#11593)
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.21.0 by this push:
new 7b60b66 [BEAM-9870] Always generate Dataflow-compatible FnApi protos. (#11593)
new 26b60be Merge pull request #11599 from robertwb/release-2.21.0
7b60b66 is described below
commit 7b60b664742bec1ae7140132d6ab79e2580bccb4
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Mon May 4 09:28:19 2020 -0700
[BEAM-9870] Always generate Dataflow-compatible FnApi protos. (#11593)
---
.../runners/dataflow/dataflow_runner.py | 57 +++++++++++-----------
.../runners/dataflow/dataflow_runner_test.py | 3 +-
2 files changed, 31 insertions(+), 29 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index d0aaa8f..5ca1b8c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -320,7 +320,7 @@ class DataflowRunner(PipelineRunner):
return SetPDoneVisitor(pipeline)
@staticmethod
- def side_input_visitor(use_unified_worker=False):
+ def side_input_visitor(use_unified_worker=False, use_fn_api=False):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.pipeline import PipelineVisitor
@@ -338,7 +338,7 @@ class DataflowRunner(PipelineRunner):
for ix, side_input in enumerate(transform_node.side_inputs):
access_pattern = side_input._side_input_data().access_pattern
if access_pattern == common_urns.side_inputs.ITERABLE.urn:
- if use_unified_worker:
+ if use_unified_worker or not use_fn_api:
# TODO(BEAM-9173): Stop patching up the access pattern to
# appease Dataflow when using the UW and hardcode the output
# type to be Any since the Dataflow JSON and pipeline proto
@@ -377,8 +377,9 @@ class DataflowRunner(PipelineRunner):
'Unsupported access pattern for %r: %r' %
(transform_node.full_label, access_pattern))
new_side_inputs.append(new_side_input)
- transform_node.side_inputs = new_side_inputs
- transform_node.transform.side_inputs = new_side_inputs
+ if use_fn_api:
+ transform_node.side_inputs = new_side_inputs
+ transform_node.transform.side_inputs = new_side_inputs
return SideInputVisitor()
@@ -446,9 +447,10 @@ class DataflowRunner(PipelineRunner):
self._maybe_add_unified_worker_missing_options(options)
# Convert all side inputs into a form acceptable to Dataflow.
- if apiclient._use_fnapi(options):
- pipeline.visit(
- self.side_input_visitor(apiclient._use_unified_worker(options)))
+ pipeline.visit(
+ self.side_input_visitor(
+ apiclient._use_unified_worker(options),
+ apiclient._use_fnapi(options)))
# Performing configured PTransform overrides. Note that this is currently
# done before Runner API serialization, since the new proto needs to contain
@@ -966,32 +968,31 @@ class DataflowRunner(PipelineRunner):
transform_id = self.proto_context.transforms.get_id(transform_node)
use_fnapi = apiclient._use_fnapi(options)
use_unified_worker = apiclient._use_unified_worker(options)
+ # Patch side input ids to be unique across a given pipeline.
+ if (label_renames and
+ transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
+ # Patch PTransform proto.
+ for old, new in iteritems(label_renames):
+ transform_proto.inputs[new] = transform_proto.inputs[old]
+ del transform_proto.inputs[old]
+
+ # Patch ParDo proto.
+ proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
+ proto = proto_utils.parse_Bytes(transform_proto.spec.payload, proto_type)
+ for old, new in iteritems(label_renames):
+ proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
+ del proto.side_inputs[old]
+ transform_proto.spec.payload = proto.SerializeToString()
+ # We need to update the pipeline proto.
+ del self.proto_pipeline.components.transforms[transform_id]
+ (
+ self.proto_pipeline.components.transforms[transform_id].CopyFrom(
+ transform_proto))
# The data transmitted in SERIALIZED_FN is different depending on whether
# this is a fnapi pipeline or not.
if (use_fnapi and
(transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn or
use_unified_worker)):
- # Patch side input ids to be unique across a given pipeline.
- if (label_renames and
- transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
- # Patch PTransform proto.
- for old, new in iteritems(label_renames):
- transform_proto.inputs[new] = transform_proto.inputs[old]
- del transform_proto.inputs[old]
-
- # Patch ParDo proto.
- proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
- proto = proto_utils.parse_Bytes(
- transform_proto.spec.payload, proto_type)
- for old, new in iteritems(label_renames):
- proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
- del proto.side_inputs[old]
- transform_proto.spec.payload = proto.SerializeToString()
- # We need to update the pipeline proto.
- del self.proto_pipeline.components.transforms[transform_id]
- (
- self.proto_pipeline.components.transforms[transform_id].CopyFrom(
- transform_proto))
serialized_data = transform_id
else:
serialized_data = pickler.dumps(
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index c939274..ba56164 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -436,7 +436,8 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
beam.pvalue.AsSingleton(pc),
beam.pvalue.AsMultiMap(pc))
applied_transform = AppliedPTransform(None, transform, "label", [pc])
- DataflowRunner.side_input_visitor().visit_transform(applied_transform)
+ DataflowRunner.side_input_visitor(
+ use_fn_api=True).visit_transform(applied_transform)
self.assertEqual(2, len(applied_transform.side_inputs))
for side_input in applied_transform.side_inputs:
self.assertEqual(