You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2018/01/30 06:56:47 UTC
[beam] branch master updated: [BEAM-3557] Sets parent pointer of
AppliedPTransform objects correctly (#4532)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7b40982 [BEAM-3557] Sets parent pointer of AppliedPTransform objects correctly (#4532)
7b40982 is described below
commit 7b40982e7d429c57dfaa0f25c2daf24e20d42b26
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Mon Jan 29 22:56:43 2018 -0800
[BEAM-3557] Sets parent pointer of AppliedPTransform objects correctly (#4532)
Sets parent pointer correctly when constructing AppliedPTransform objects from a runner API proto.
---
sdks/python/apache_beam/pipeline.py | 7 +++++--
sdks/python/apache_beam/pipeline_test.py | 14 ++++++++++++++
2 files changed, 19 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index d3d1932..abb4359 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -791,8 +791,11 @@ class AppliedPTransform(object):
for si, pcoll in zip(result.transform.side_inputs, side_inputs):
si.pvalue = pcoll
result.side_inputs = tuple(result.transform.side_inputs)
- result.parts = [
- context.transforms.get_by_id(id) for id in proto.subtransforms]
+ result.parts = []
+ for transform_id in proto.subtransforms:
+ part = context.transforms.get_by_id(transform_id)
+ part.parent = result
+ result.parts.append(part)
result.outputs = {
None if tag == 'None' else tag: context.pcollections.get_by_id(id)
for tag, id in proto.outputs.items()}
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 634483e..34ec48e 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -511,6 +511,20 @@ class RunnerApiTest(unittest.TestCase):
p.to_runner_api()
self.assertEqual(MyPTransform.pickle_count[0], 20)
+ def test_parent_pointer(self):
+ class MyPTransform(beam.PTransform):
+
+ def expand(self, p):
+ self.p = p
+ return p | beam.Create([None])
+
+ p = beam.Pipeline()
+ p | MyPTransform() # pylint: disable=expression-not-assigned
+ p = Pipeline.from_runner_api(Pipeline.to_runner_api(p), None, None)
+ self.assertIsNotNone(p.transforms_stack[0].parts[0].parent)
+ self.assertEquals(p.transforms_stack[0].parts[0].parent,
+ p.transforms_stack[0])
+
class DirectRunnerRetryTests(unittest.TestCase):
--
To stop receiving notification emails like this one, please contact
chamikara@apache.org.