You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/12/20 21:31:01 UTC

[beam] branch master updated: Fix for side inputs following flatten/gbk. (#4281)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new d468d4a  Fix for side inputs following flatten/gbk. (#4281)
d468d4a is described below

commit d468d4a927031d9d181f9fd39d7c558abeeef8d1
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Wed Dec 20 13:30:58 2017 -0800

    Fix for side inputs following flatten/gbk. (#4281)
---
 .../apache_beam/runners/portability/fn_api_runner.py |  4 ++--
 .../runners/portability/fn_api_runner_test.py        | 20 ++++++++++++++++++++
 2 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 683f52d..c5829a4 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -525,7 +525,7 @@ class FnApiRunner(runner.PipelineRunner):
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_INPUT_URN,
                       payload=param))],
-              downstream_side_inputs=frozenset(),
+              downstream_side_inputs=stage.downstream_side_inputs,
               must_follow=union(frozenset([gbk_write]), stage.must_follow))
         else:
           yield stage
@@ -594,7 +594,7 @@ class FnApiRunner(runner.PipelineRunner):
                   spec=beam_runner_api_pb2.FunctionSpec(
                       urn=bundle_processor.DATA_INPUT_URN,
                       payload=param))],
-              downstream_side_inputs=frozenset(),
+              downstream_side_inputs=stage.downstream_side_inputs,
               must_follow=union(frozenset(flatten_writes), stage.must_follow))
 
         else:
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index eb297ab..c15b329 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -78,6 +78,26 @@ class FnApiRunnerTest(
               (9, range(7, 10))]),
           label='windowed')
 
+  def test_flattened_side_input(self):
+    with self.create_pipeline() as p:
+      main = p | 'main' >> beam.Create([None])
+      side1 = p | 'side1' >> beam.Create([('a', 1)])
+      side2 = p | 'side2' >> beam.Create([('b', 2)])
+      side = (side1, side2) | beam.Flatten()
+      _ = main | 'Do' >> beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side))
+      assert_that(
+          main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
+          equal_to([(None, {'a': 1, 'b': 2})]))
+
+  def test_gbk_side_input(self):
+    with self.create_pipeline() as p:
+      main = p | 'main' >> beam.Create([None])
+      side = p | 'side' >> beam.Create([('a', 1)]) | beam.GroupByKey()
+      _ = main | 'Do' >> beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side))
+      assert_that(
+          main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)),
+          equal_to([(None, {'a': [1]})]))
+
   def test_assert_that(self):
     # TODO: figure out a way for fn_api_runner to parse and raise the
     # underlying exception.

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].