You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2018/01/03 01:24:07 UTC

[beam] branch master updated: Correctly handle pass-through "composites" in the Fn Api Runner. (#4326)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d9bf07  Correctly handle pass-through "composites" in the Fn Api Runner. (#4326)
7d9bf07 is described below

commit 7d9bf07f876a82ee5d10a4aca24197e470603f90
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Tue Jan 2 17:24:03 2018 -0800

    Correctly handle pass-through "composites" in the Fn Api Runner. (#4326)
---
 .../python/apache_beam/runners/portability/fn_api_runner.py |  6 +++++-
 .../apache_beam/runners/portability/fn_api_runner_test.py   | 13 +++++++++++--
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index c5829a4..c921857 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -773,8 +773,12 @@ class FnApiRunner(runner.PipelineRunner):
     def leaf_transforms(root_ids):
       for root_id in root_ids:
         root = pipeline_proto.components.transforms[root_id]
-        if root.spec.urn in known_composites or not root.subtransforms:
+        if root.spec.urn in known_composites:
           yield root_id
+        elif not root.subtransforms:
+          # Make sure its outputs are not a subset of its inputs.
+          if set(root.outputs.values()) - set(root.inputs.values()):
+            yield root_id
         else:
           for leaf in leaf_transforms(root.subtransforms):
             yield leaf
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index c15b329..1cffa26 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -84,7 +84,6 @@ class FnApiRunnerTest(
       side1 = p | 'side1' >> beam.Create([('a', 1)])
       side2 = p | 'side2' >> beam.Create([('b', 2)])
       side = (side1, side2) | beam.Flatten()
-      _ = main | 'Do' >> beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side))
       assert_that(
           main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
           equal_to([(None, {'a': 1, 'b': 2})]))
@@ -93,7 +92,6 @@ class FnApiRunnerTest(
     with self.create_pipeline() as p:
       main = p | 'main' >> beam.Create([None])
       side = p | 'side' >> beam.Create([('a', 1)]) | beam.GroupByKey()
-      _ = main | 'Do' >> beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side))
       assert_that(
           main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
           equal_to([(None, {'a': [1]})]))
@@ -105,6 +103,17 @@ class FnApiRunnerTest(
       with self.create_pipeline() as p:
         assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
 
+  def test_no_subtransform_composite(self):
+
+    class First(beam.PTransform):
+      def expand(self, pcolls):
+        return pcolls[0]
+
+    with self.create_pipeline() as p:
+      pcoll_a = p | 'a' >> beam.Create(['a'])
+      pcoll_b = p | 'b' >> beam.Create(['b'])
+      assert_that((pcoll_a, pcoll_b) | First(), equal_to(['a']))
+
   def test_progress_metrics(self):
     p = self.create_pipeline()
     if not isinstance(p.runner, fn_api_runner.FnApiRunner):

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].