You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/07/17 21:35:13 UTC
[1/2] beam git commit: Closes #3520
Repository: beam
Updated Branches:
refs/heads/master 7e4719cd0 -> 532256e88
Closes #3520
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/532256e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/532256e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/532256e8
Branch: refs/heads/master
Commit: 532256e8811b790fdf25fb4e11b7c2b89383761a
Parents: 7e4719c 7257507
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 17 14:33:01 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 17 14:33:01 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/dataflow_runner.py | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[2/2] beam git commit: Improving labeling of side inputs for Dataflow
Posted by ro...@apache.org.
Improving labeling of side inputs for Dataflow
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7257507d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7257507d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7257507d
Branch: refs/heads/master
Commit: 7257507d939271a91287837c20fcdde37dc1ddeb
Parents: 7e4719c
Author: Pablo <pa...@google.com>
Authored: Fri Jul 7 13:49:47 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 17 14:33:01 2017 -0700
----------------------------------------------------------------------
.../runners/dataflow/dataflow_runner.py | 18 ++++++++++++++++--
1 file changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7257507d/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 059e139..89c18d4 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -21,6 +21,7 @@ The runner will create a JSON description of the job graph and then submit it
to the Dataflow Service for remote execution by a worker.
"""
+from collections import defaultdict
import logging
import threading
import time
@@ -485,11 +486,24 @@ class DataflowRunner(PipelineRunner):
si_dict = {}
# We must call self._cache.get_pvalue exactly once due to refcounting.
si_labels = {}
+ full_label_counts = defaultdict(int)
lookup_label = lambda side_pval: si_labels[side_pval]
for side_pval in transform_node.side_inputs:
assert isinstance(side_pval, AsSideInput)
- si_label = 'SideInput-' + self._get_unique_step_name()
- si_full_label = '%s/%s' % (transform_node.full_label, si_label)
+ step_number = self._get_unique_step_name()
+ si_label = 'SideInput-' + step_number
+ pcollection_label = '%s.%s' % (
+ side_pval.pvalue.producer.full_label.split('/')[-1],
+ side_pval.pvalue.tag if side_pval.pvalue.tag else 'out')
+ si_full_label = '%s/%s(%s.%s)' % (transform_node.full_label,
+ side_pval.__class__.__name__,
+ pcollection_label,
+ full_label_counts[pcollection_label])
+
+ # Count the number of times the same PCollection is a side input
+ # to the same ParDo.
+ full_label_counts[pcollection_label] += 1
+
self._add_singleton_step(
si_label, si_full_label, side_pval.pvalue.tag,
self._cache.get_pvalue(side_pval.pvalue))