You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/04/01 04:44:53 UTC

[2/3] beam git commit: Only encode PCollection outputs in Runner API protos.

Only encode PCollection outputs in Runner API protos.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9ff44af
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9ff44af
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9ff44af

Branch: refs/heads/master
Commit: c9ff44afa3fcb47b7f0c4288f4f7d520f063d442
Parents: 0749982
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Mar 31 16:57:01 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Mar 31 21:44:21 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c9ff44af/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index ee5904b..0841e5f 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -494,6 +494,10 @@ class AppliedPTransform(object):
     return {str(ix): input for ix, input in enumerate(self.inputs)
             if isinstance(input, pvalue.PCollection)}
 
+  def named_outputs(self):
+    return {str(tag): output for tag, output in self.outputs.items()
+            if isinstance(output, pvalue.PCollection)}
+
   def to_runner_api(self, context):
     from apache_beam.runners.api import beam_runner_api_pb2
     return beam_runner_api_pb2.PTransform(
@@ -507,7 +511,7 @@ class AppliedPTransform(object):
         inputs={tag: context.pcollections.get_id(pc)
                 for tag, pc in self.named_inputs().items()},
         outputs={str(tag): context.pcollections.get_id(out)
-                 for tag, out in self.outputs.items()},
+                 for tag, out in self.named_outputs().items()},
         # TODO(BEAM-115): display_data
         display_data=None)