You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/02 00:33:23 UTC

[4/5] beam git commit: More informative references in the proto representation.

More informative references in the proto representation.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ee7422f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ee7422f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ee7422f

Branch: refs/heads/master
Commit: 2ee7422f1dbd00e3c40ac95e9eaddc745d46fe65
Parents: b179eca
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 18:21:47 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/pipeline.py                 | 3 ++-
 sdks/python/apache_beam/runners/pipeline_context.py | 8 ++++----
 2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2ee7422f/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 8553f7c..e7c2322 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -730,7 +730,8 @@ class AppliedPTransform(object):
     return beam_runner_api_pb2.PTransform(
         unique_name=self.full_label,
         spec=transform_to_runner_api(self.transform, context),
-        subtransforms=[context.transforms.get_id(part) for part in self.parts],
+        subtransforms=[context.transforms.get_id(part, label=part.full_label)
+                       for part in self.parts],
         # TODO(BEAM-115): Side inputs.
         inputs={tag: context.pcollections.get_id(pc)
                 for tag, pc in self.named_inputs().items()},

http://git-wip-us.apache.org/repos/asf/beam/blob/2ee7422f/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index a40069b..f4de42a 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -43,18 +43,18 @@ class _PipelineContextMap(object):
     self._id_to_proto = proto_map if proto_map else {}
     self._counter = 0
 
-  def _unique_ref(self, obj=None):
+  def _unique_ref(self, obj=None, label=None):
     self._counter += 1
     return "ref_%s_%s_%s" % (
-        self._obj_type.__name__, type(obj).__name__, self._counter)
+        self._obj_type.__name__, label or type(obj).__name__, self._counter)
 
   def populate_map(self, proto_map):
     for id, proto in self._id_to_proto.items():
       proto_map[id].CopyFrom(proto)
 
-  def get_id(self, obj):
+  def get_id(self, obj, label=None):
     if obj not in self._obj_to_id:
-      id = self._unique_ref(obj)
+      id = self._unique_ref(obj, label)
       self._id_to_obj[id] = obj
       self._obj_to_id[obj] = id
       self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)