You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/12/20 21:31:01 UTC
[beam] branch master updated: Fix for side inputs following
flatten/gbk. (#4281)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d468d4a Fix for side inputs following flatten/gbk. (#4281)
d468d4a is described below
commit d468d4a927031d9d181f9fd39d7c558abeeef8d1
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Wed Dec 20 13:30:58 2017 -0800
Fix for side inputs following flatten/gbk. (#4281)
---
.../apache_beam/runners/portability/fn_api_runner.py | 4 ++--
.../runners/portability/fn_api_runner_test.py | 20 ++++++++++++++++++++
2 files changed, 22 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 683f52d..c5829a4 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -525,7 +525,7 @@ class FnApiRunner(runner.PipelineRunner):
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
payload=param))],
- downstream_side_inputs=frozenset(),
+ downstream_side_inputs=stage.downstream_side_inputs,
must_follow=union(frozenset([gbk_write]), stage.must_follow))
else:
yield stage
@@ -594,7 +594,7 @@ class FnApiRunner(runner.PipelineRunner):
spec=beam_runner_api_pb2.FunctionSpec(
urn=bundle_processor.DATA_INPUT_URN,
payload=param))],
- downstream_side_inputs=frozenset(),
+ downstream_side_inputs=stage.downstream_side_inputs,
must_follow=union(frozenset(flatten_writes), stage.must_follow))
else:
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index eb297ab..c15b329 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -78,6 +78,26 @@ class FnApiRunnerTest(
(9, range(7, 10))]),
label='windowed')
+ def test_flattened_side_input(self):
+ with self.create_pipeline() as p:
+ main = p | 'main' >> beam.Create([None])
+ side1 = p | 'side1' >> beam.Create([('a', 1)])
+ side2 = p | 'side2' >> beam.Create([('b', 2)])
+ side = (side1, side2) | beam.Flatten()
+ _ = main | 'Do' >> beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side))
+ assert_that(
+ main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
+ equal_to([(None, {'a': 1, 'b': 2})]))
+
+ def test_gbk_side_input(self):
+ with self.create_pipeline() as p:
+ main = p | 'main' >> beam.Create([None])
+ side = p | 'side' >> beam.Create([('a', 1)]) | beam.GroupByKey()
+ _ = main | 'Do' >> beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side))
+ assert_that(
+ main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
+ equal_to([(None, {'a': [1]})]))
+
def test_assert_that(self):
# TODO: figure out a way for fn_api_runner to parse and raise the
# underlying exception.
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].