You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/06/16 20:54:08 UTC

[1/2] beam git commit: Populate PBegin input when decoding from Runner API

Repository: beam
Updated Branches:
  refs/heads/master e827642ef -> 0cabdf6e7


Populate PBegin input when decoding from Runner API


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4519681e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4519681e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4519681e

Branch: refs/heads/master
Commit: 4519681ec3d2fb723a514128d7c9c531c8de9dbf
Parents: e827642
Author: Charles Chen <cc...@google.com>
Authored: Thu Jun 15 15:27:18 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 16 13:53:59 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4519681e/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index ab77956..d84a2b7 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -515,7 +515,18 @@ class Pipeline(object):
     p.applied_labels = set([
         t.unique_name for t in proto.components.transforms.values()])
     for id in proto.components.pcollections:
-      context.pcollections.get_by_id(id).pipeline = p
+      pcollection = context.pcollections.get_by_id(id)
+      pcollection.pipeline = p
+
+    # Inject PBegin input where necessary.
+    from apache_beam.io.iobase import Read
+    from apache_beam.transforms.core import Create
+    has_pbegin = [Read, Create]
+    for id in proto.components.transforms:
+      transform = context.transforms.get_by_id(id)
+      if not transform.inputs and transform.transform.__class__ in has_pbegin:
+        transform.inputs = (pvalue.PBegin(p),)
+
     return p
 
 


[2/2] beam git commit: Closes #3373

Posted by ro...@apache.org.
Closes #3373


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0cabdf6e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0cabdf6e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0cabdf6e

Branch: refs/heads/master
Commit: 0cabdf6e776363e639f18779744da73f9d29bb5a
Parents: e827642 4519681
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Jun 16 13:54:00 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Jun 16 13:54:00 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------