You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2018/01/30 06:56:47 UTC

[beam] branch master updated: [BEAM-3557] Sets parent pointer of AppliedPTransform objects correctly (#4532)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b40982  [BEAM-3557] Sets parent pointer of AppliedPTransform objects correctly (#4532)
7b40982 is described below

commit 7b40982e7d429c57dfaa0f25c2daf24e20d42b26
Author: Chamikara Jayalath <ch...@google.com>
AuthorDate: Mon Jan 29 22:56:43 2018 -0800

    [BEAM-3557] Sets parent pointer of AppliedPTransform objects correctly (#4532)
    
    Sets parent pointer correctly when constructing AppliedPTransform objects from a runner API proto.
---
 sdks/python/apache_beam/pipeline.py      |  7 +++++--
 sdks/python/apache_beam/pipeline_test.py | 14 ++++++++++++++
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index d3d1932..abb4359 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -791,8 +791,11 @@ class AppliedPTransform(object):
       for si, pcoll in zip(result.transform.side_inputs, side_inputs):
         si.pvalue = pcoll
       result.side_inputs = tuple(result.transform.side_inputs)
-    result.parts = [
-        context.transforms.get_by_id(id) for id in proto.subtransforms]
+    result.parts = []
+    for transform_id in proto.subtransforms:
+      part = context.transforms.get_by_id(transform_id)
+      part.parent = result
+      result.parts.append(part)
     result.outputs = {
         None if tag == 'None' else tag: context.pcollections.get_by_id(id)
         for tag, id in proto.outputs.items()}
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 634483e..34ec48e 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -511,6 +511,20 @@ class RunnerApiTest(unittest.TestCase):
     p.to_runner_api()
     self.assertEqual(MyPTransform.pickle_count[0], 20)
 
+  def test_parent_pointer(self):
+    class MyPTransform(beam.PTransform):
+
+      def expand(self, p):
+        self.p = p
+        return p | beam.Create([None])
+
+    p = beam.Pipeline()
+    p | MyPTransform()  # pylint: disable=expression-not-assigned
+    p = Pipeline.from_runner_api(Pipeline.to_runner_api(p), None, None)
+    self.assertIsNotNone(p.transforms_stack[0].parts[0].parent)
+    self.assertEquals(p.transforms_stack[0].parts[0].parent,
+                      p.transforms_stack[0])
+
 
 class DirectRunnerRetryTests(unittest.TestCase):
 

-- 
To stop receiving notification emails like this one, please contact
chamikara@apache.org.