You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2018/01/03 01:24:07 UTC
[beam] branch master updated: Correctly handle pass-through
"composites" in the Fn Api Runner. (#4326)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7d9bf07 Correctly handle pass-through "composites" in the Fn Api Runner. (#4326)
7d9bf07 is described below
commit 7d9bf07f876a82ee5d10a4aca24197e470603f90
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Tue Jan 2 17:24:03 2018 -0800
Correctly handle pass-through "composites" in the Fn Api Runner. (#4326)
---
.../python/apache_beam/runners/portability/fn_api_runner.py | 6 +++++-
.../apache_beam/runners/portability/fn_api_runner_test.py | 13 +++++++++++--
2 files changed, 16 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index c5829a4..c921857 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -773,8 +773,12 @@ class FnApiRunner(runner.PipelineRunner):
def leaf_transforms(root_ids):
for root_id in root_ids:
root = pipeline_proto.components.transforms[root_id]
- if root.spec.urn in known_composites or not root.subtransforms:
+ if root.spec.urn in known_composites:
yield root_id
+ elif not root.subtransforms:
+ # Make sure its outputs are not a subset of its inputs.
+ if set(root.outputs.values()) - set(root.inputs.values()):
+ yield root_id
else:
for leaf in leaf_transforms(root.subtransforms):
yield leaf
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index c15b329..1cffa26 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -84,7 +84,6 @@ class FnApiRunnerTest(
side1 = p | 'side1' >> beam.Create([('a', 1)])
side2 = p | 'side2' >> beam.Create([('b', 2)])
side = (side1, side2) | beam.Flatten()
- _ = main | 'Do' >> beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side))
assert_that(
main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
equal_to([(None, {'a': 1, 'b': 2})]))
@@ -93,7 +92,6 @@ class FnApiRunnerTest(
with self.create_pipeline() as p:
main = p | 'main' >> beam.Create([None])
side = p | 'side' >> beam.Create([('a', 1)]) | beam.GroupByKey()
- _ = main | 'Do' >> beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side))
assert_that(
main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
equal_to([(None, {'a': [1]})]))
@@ -105,6 +103,17 @@ class FnApiRunnerTest(
with self.create_pipeline() as p:
assert_that(p | beam.Create(['a', 'b']), equal_to(['a']))
+ def test_no_subtransform_composite(self):
+
+ class First(beam.PTransform):
+ def expand(self, pcolls):
+ return pcolls[0]
+
+ with self.create_pipeline() as p:
+ pcoll_a = p | 'a' >> beam.Create(['a'])
+ pcoll_b = p | 'b' >> beam.Create(['b'])
+ assert_that((pcoll_a, pcoll_b) | First(), equal_to(['a']))
+
def test_progress_metrics(self):
p = self.create_pipeline()
if not isinstance(p.runner, fn_api_runner.FnApiRunner):
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].