You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/04/01 04:45:31 UTC
[2/2] beam git commit: Fix side inputs on dataflow runner.
Fix side inputs on dataflow runner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/07daf3a5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/07daf3a5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/07daf3a5
Branch: refs/heads/master
Commit: 07daf3a54544ce842165ffe15264e43ebced28ba
Parents: 60901f8
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Fri Mar 31 14:57:44 2017 -0700
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Fri Mar 31 21:45:16 2017 -0700
----------------------------------------------------------------------
.../apache_beam/runners/dataflow/dataflow_runner.py | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/07daf3a5/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index db433df..fe9f8c0 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -287,7 +287,7 @@ class DataflowRunner(PipelineRunner):
def _add_singleton_step(self, label, full_label, tag, input_step):
"""Creates a CollectionToSingleton step used to handle ParDo side inputs."""
# Import here to avoid adding the dependency for local running scenarios.
- from google.cloud.dataflow.internal import apiclient
+ from apache_beam.runners.dataflow.internal import apiclient
step = apiclient.Step(TransformNames.COLLECTION_TO_SINGLETON, label)
self.job.proto.steps.append(step.proto)
step.add_property(PropertyNames.USER_NAME, full_label)
@@ -302,7 +302,7 @@ class DataflowRunner(PipelineRunner):
[{PropertyNames.USER_NAME: (
'%s.%s' % (full_label, PropertyNames.OUTPUT)),
PropertyNames.ENCODING: step.encoding,
- PropertyNames.OUTPUT_NAME: PropertyNames.OUTPUT}])
+ PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
return step
def run_Flatten(self, transform_node):
@@ -374,12 +374,10 @@ class DataflowRunner(PipelineRunner):
si_dict = {}
# We must call self._cache.get_pvalue exactly once due to refcounting.
si_labels = {}
- for side_pval in transform_node.side_inputs:
- si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name
lookup_label = lambda side_pval: si_labels[side_pval]
for side_pval in transform_node.side_inputs:
assert isinstance(side_pval, AsSideInput)
- si_label = self._get_unique_step_name()
+ si_label = 'SideInput-' + self._get_unique_step_name()
si_full_label = '%s/%s' % (transform_node.full_label, si_label)
self._add_singleton_step(
si_label, si_full_label, side_pval.pvalue.tag,
@@ -388,10 +386,13 @@ class DataflowRunner(PipelineRunner):
'@type': 'OutputReference',
PropertyNames.STEP_NAME: si_label,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}
+ si_labels[side_pval] = si_label
# Now create the step for the ParDo transform being handled.
step = self._add_step(
- TransformNames.DO, transform_node.full_label, transform_node,
+ TransformNames.DO,
+ transform_node.full_label + '/Do' if transform_node.side_inputs else '',
+ transform_node,
transform_node.transform.side_output_tags)
fn_data = self._pardo_fn_data(transform_node, lookup_label)
step.add_property(PropertyNames.SERIALIZED_FN, pickler.dumps(fn_data))