You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/23 23:47:10 UTC
[04/12] incubator-beam git commit: Fix multi-input named PTransforms.
Fix multi-input named PTransforms.
Now delegate the __ror__ logic entirely for the naming wrapper.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937cf69e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937cf69e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937cf69e
Branch: refs/heads/python-sdk
Commit: 937cf69e958d4a82fb274f311de248930298db69
Parents: 9fe102a
Author: Robert Bradshaw <ro...@google.com>
Authored: Fri Jul 22 14:32:33 2016 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Sat Jul 23 16:43:45 2016 -0700
----------------------------------------------------------------------
sdks/python/apache_beam/transforms/ptransform.py | 7 +++++--
1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937cf69e/sdks/python/apache_beam/transforms/ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index da8b671..b652bca 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -400,7 +400,7 @@ class PTransform(WithTypeHints):
else:
return NotImplemented
- def __ror__(self, left):
+ def __ror__(self, left, label=None):
"""Used to apply this PTransform to non-PValues, e.g., a tuple."""
pvalueish, pvalues = self._extract_input_pvalues(left)
pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)]
@@ -434,7 +434,7 @@ class PTransform(WithTypeHints):
if not isinstance(v, pvalue.PValue) and v is not None}
pvalueish = _SetInputPValues().visit(pvalueish, replacements)
self.pipeline = p
- result = p.apply(self, pvalueish)
+ result = p.apply(self, pvalueish, label)
if deferred:
return result
else:
@@ -720,5 +720,8 @@ class _NamedPTransform(PTransform):
super(_NamedPTransform, self).__init__(label)
self.transform = transform
+ def __ror__(self, pvalueish):
+ return self.transform.__ror__(pvalueish, self.label)
+
def apply(self, pvalue):
raise RuntimeError("Should never be applied directly.")