You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/08/02 00:33:23 UTC
[4/5] beam git commit: More informative references in the proto
representation.
More informative references in the proto representation.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2ee7422f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2ee7422f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2ee7422f
Branch: refs/heads/master
Commit: 2ee7422f1dbd00e3c40ac95e9eaddc745d46fe65
Parents: b179eca
Author: Robert Bradshaw <ro...@google.com>
Authored: Wed Jul 26 18:21:47 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Aug 1 17:32:59 2017 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/pipeline.py | 3 ++-
sdks/python/apache_beam/runners/pipeline_context.py | 8 ++++----
2 files changed, 6 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/2ee7422f/sdks/python/apache_beam/pipeline.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 8553f7c..e7c2322 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -730,7 +730,8 @@ class AppliedPTransform(object):
return beam_runner_api_pb2.PTransform(
unique_name=self.full_label,
spec=transform_to_runner_api(self.transform, context),
- subtransforms=[context.transforms.get_id(part) for part in self.parts],
+ subtransforms=[context.transforms.get_id(part, label=part.full_label)
+ for part in self.parts],
# TODO(BEAM-115): Side inputs.
inputs={tag: context.pcollections.get_id(pc)
for tag, pc in self.named_inputs().items()},
http://git-wip-us.apache.org/repos/asf/beam/blob/2ee7422f/sdks/python/apache_beam/runners/pipeline_context.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index a40069b..f4de42a 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -43,18 +43,18 @@ class _PipelineContextMap(object):
self._id_to_proto = proto_map if proto_map else {}
self._counter = 0
- def _unique_ref(self, obj=None):
+ def _unique_ref(self, obj=None, label=None):
self._counter += 1
return "ref_%s_%s_%s" % (
- self._obj_type.__name__, type(obj).__name__, self._counter)
+ self._obj_type.__name__, label or type(obj).__name__, self._counter)
def populate_map(self, proto_map):
for id, proto in self._id_to_proto.items():
proto_map[id].CopyFrom(proto)
- def get_id(self, obj):
+ def get_id(self, obj, label=None):
if obj not in self._obj_to_id:
- id = self._unique_ref(obj)
+ id = self._unique_ref(obj, label)
self._id_to_obj[id] = obj
self._obj_to_id[obj] = id
self._id_to_proto[id] = obj.to_runner_api(self._pipeline_context)