You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/05/04 17:53:43 UTC

[beam] branch release-2.21.0 updated: [BEAM-9870] Always generate Dataflow-compatible FnApi protos. (#11593)

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.21.0 by this push:
     new 7b60b66  [BEAM-9870] Always generate Dataflow-compatible FnApi protos. (#11593)
     new 26b60be  Merge pull request #11599 from robertwb/release-2.21.0
7b60b66 is described below

commit 7b60b664742bec1ae7140132d6ab79e2580bccb4
Author: Robert Bradshaw <ro...@google.com>
AuthorDate: Mon May 4 09:28:19 2020 -0700

    [BEAM-9870] Always generate Dataflow-compatible FnApi protos. (#11593)
---
 .../runners/dataflow/dataflow_runner.py            | 57 +++++++++++-----------
 .../runners/dataflow/dataflow_runner_test.py       |  3 +-
 2 files changed, 31 insertions(+), 29 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index d0aaa8f..5ca1b8c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -320,7 +320,7 @@ class DataflowRunner(PipelineRunner):
     return SetPDoneVisitor(pipeline)
 
   @staticmethod
-  def side_input_visitor(use_unified_worker=False):
+  def side_input_visitor(use_unified_worker=False, use_fn_api=False):
     # Imported here to avoid circular dependencies.
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.pipeline import PipelineVisitor
@@ -338,7 +338,7 @@ class DataflowRunner(PipelineRunner):
           for ix, side_input in enumerate(transform_node.side_inputs):
             access_pattern = side_input._side_input_data().access_pattern
             if access_pattern == common_urns.side_inputs.ITERABLE.urn:
-              if use_unified_worker:
+              if use_unified_worker or not use_fn_api:
                 # TODO(BEAM-9173): Stop patching up the access pattern to
                 # appease Dataflow when using the UW and hardcode the output
                 # type to be Any since the Dataflow JSON and pipeline proto
@@ -377,8 +377,9 @@ class DataflowRunner(PipelineRunner):
                   'Unsupported access pattern for %r: %r' %
                   (transform_node.full_label, access_pattern))
             new_side_inputs.append(new_side_input)
-          transform_node.side_inputs = new_side_inputs
-          transform_node.transform.side_inputs = new_side_inputs
+          if use_fn_api:
+            transform_node.side_inputs = new_side_inputs
+            transform_node.transform.side_inputs = new_side_inputs
 
     return SideInputVisitor()
 
@@ -446,9 +447,10 @@ class DataflowRunner(PipelineRunner):
     self._maybe_add_unified_worker_missing_options(options)
 
     # Convert all side inputs into a form acceptable to Dataflow.
-    if apiclient._use_fnapi(options):
-      pipeline.visit(
-          self.side_input_visitor(apiclient._use_unified_worker(options)))
+    pipeline.visit(
+        self.side_input_visitor(
+            apiclient._use_unified_worker(options),
+            apiclient._use_fnapi(options)))
 
     # Performing configured PTransform overrides.  Note that this is currently
     # done before Runner API serialization, since the new proto needs to contain
@@ -966,32 +968,31 @@ class DataflowRunner(PipelineRunner):
     transform_id = self.proto_context.transforms.get_id(transform_node)
     use_fnapi = apiclient._use_fnapi(options)
     use_unified_worker = apiclient._use_unified_worker(options)
+    # Patch side input ids to be unique across a given pipeline.
+    if (label_renames and
+        transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
+      # Patch PTransform proto.
+      for old, new in iteritems(label_renames):
+        transform_proto.inputs[new] = transform_proto.inputs[old]
+        del transform_proto.inputs[old]
+
+      # Patch ParDo proto.
+      proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
+      proto = proto_utils.parse_Bytes(transform_proto.spec.payload, proto_type)
+      for old, new in iteritems(label_renames):
+        proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
+        del proto.side_inputs[old]
+      transform_proto.spec.payload = proto.SerializeToString()
+      # We need to update the pipeline proto.
+      del self.proto_pipeline.components.transforms[transform_id]
+      (
+          self.proto_pipeline.components.transforms[transform_id].CopyFrom(
+              transform_proto))
     # The data transmitted in SERIALIZED_FN is different depending on whether
     # this is a fnapi pipeline or not.
     if (use_fnapi and
         (transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn or
          use_unified_worker)):
-      # Patch side input ids to be unique across a given pipeline.
-      if (label_renames and
-          transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn):
-        # Patch PTransform proto.
-        for old, new in iteritems(label_renames):
-          transform_proto.inputs[new] = transform_proto.inputs[old]
-          del transform_proto.inputs[old]
-
-        # Patch ParDo proto.
-        proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn]
-        proto = proto_utils.parse_Bytes(
-            transform_proto.spec.payload, proto_type)
-        for old, new in iteritems(label_renames):
-          proto.side_inputs[new].CopyFrom(proto.side_inputs[old])
-          del proto.side_inputs[old]
-        transform_proto.spec.payload = proto.SerializeToString()
-        # We need to update the pipeline proto.
-        del self.proto_pipeline.components.transforms[transform_id]
-        (
-            self.proto_pipeline.components.transforms[transform_id].CopyFrom(
-                transform_proto))
       serialized_data = transform_id
     else:
       serialized_data = pickler.dumps(
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index c939274..ba56164 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -436,7 +436,8 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
         beam.pvalue.AsSingleton(pc),
         beam.pvalue.AsMultiMap(pc))
     applied_transform = AppliedPTransform(None, transform, "label", [pc])
-    DataflowRunner.side_input_visitor().visit_transform(applied_transform)
+    DataflowRunner.side_input_visitor(
+        use_fn_api=True).visit_transform(applied_transform)
     self.assertEqual(2, len(applied_transform.side_inputs))
     for side_input in applied_transform.side_inputs:
       self.assertEqual(