You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/23 23:40:14 UTC
[1/2] beam git commit: Avoid pickling the entire pipeline
per-transform.
Repository: beam
Updated Branches:
refs/heads/master 9acce7150 -> a90e40ae9
Avoid pickling the entire pipeline per-transform.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/903da41a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/903da41a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/903da41a
Branch: refs/heads/master
Commit: 903da41ac5395e76c44ef8ae1c8a695569e23abb
Parents: 9acce71
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Jun 23 15:01:42 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 23 16:39:51 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 7 +++++++
sdks/python/apache_beam/pipeline_test.py | 18 ++++++++++++++++++
2 files changed, 25 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/903da41a/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index d84a2b7..724c87d 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -466,6 +466,13 @@ class Pipeline(object):
self.transforms_stack.pop()
return pvalueish_result
+ def __reduce__(self):
+ # Some transforms contain a reference to their enclosing pipeline,
+ # which in turn reference all other transforms (resulting in quadratic
+ # time/space to pickle each transform individually). As we don't
+ # require pickled pipelines to be executable, break the chain here.
+ return str, ('Pickled pipeline stub.',)
+
def _verify_runner_api_compatible(self):
class Visitor(PipelineVisitor): # pylint: disable=used-before-assignment
ok = True # Really a nonlocal.
http://git-wip-us.apache.org/repos/asf/beam/blob/903da41a/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index f9b894f..aad0143 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -480,6 +480,24 @@ class RunnerApiTest(unittest.TestCase):
p2 = Pipeline.from_runner_api(proto, p.runner, p._options)
p2.run()
+ def test_pickling(self):
+ class MyPTransform(beam.PTransform):
+ pickle_count = [0]
+
+ def expand(self, p):
+ self.p = p
+ return p | beam.Create([None])
+
+ def __reduce__(self):
+ self.pickle_count[0] += 1
+ return str, ()
+
+ p = beam.Pipeline()
+ for k in range(20):
+ p | 'Iter%s' % k >> MyPTransform() # pylint: disable=expression-not-assigned
+ p.to_runner_api()
+ self.assertEqual(MyPTransform.pickle_count[0], 20)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
[2/2] beam git commit: Closes #3433
Posted by ro...@apache.org.
Closes #3433
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a90e40ae
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a90e40ae
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a90e40ae
Branch: refs/heads/master
Commit: a90e40ae9f748c1c8392c1198469f3229a06ed70
Parents: 9acce71 903da41
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Jun 23 16:39:52 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 23 16:39:52 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 7 +++++++
sdks/python/apache_beam/pipeline_test.py | 18 ++++++++++++++++++
2 files changed, 25 insertions(+)
----------------------------------------------------------------------