You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/04/05 16:55:05 UTC
[2/2] beam git commit: Update refcounts after pipeline reconstruction.
Update refcounts after pipeline reconstruction.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/54360814
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/54360814
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/54360814
Branch: refs/heads/master
Commit: 54360814fbc59f69e10f4bac52cfeeea5e044cf9
Parents: e2a2836
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Wed Apr 5 08:37:33 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Wed Apr 5 09:54:48 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 1 +
sdks/python/apache_beam/transforms/ptransform_test.py | 6 ++++++
2 files changed, 7 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/54360814/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 8506b85..fdb9a9d 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -546,4 +546,5 @@ class AppliedPTransform(object):
if pc not in result.inputs:
pc.producer = result
pc.tag = tag
+ result.update_input_refcounts()
return result
http://git-wip-us.apache.org/repos/asf/beam/blob/54360814/sdks/python/apache_beam/transforms/ptransform_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 37ff2a8..5889ab5 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -435,6 +435,12 @@ class PTransformTest(unittest.TestCase):
assert_that(result, equal_to([]))
pipeline.run()
+ def test_flatten_same_pcollections(self):
+ pipeline = TestPipeline()
+ pc = pipeline | beam.Create(['a', 'b'])
+ assert_that((pc, pc, pc) | beam.Flatten(), equal_to(['a', 'b'] * 3))
+ pipeline.run()
+
def test_flatten_pcollections_in_iterable(self):
pipeline = TestPipeline()
pcoll_1 = pipeline | 'Start 1' >> beam.Create([0, 1, 2, 3])