You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/07/17 21:35:13 UTC

[1/2] beam git commit: Closes #3520

Repository: beam
Updated Branches:
  refs/heads/master 7e4719cd0 -> 532256e88


Closes #3520


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/532256e8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/532256e8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/532256e8

Branch: refs/heads/master
Commit: 532256e8811b790fdf25fb4e11b7c2b89383761a
Parents: 7e4719c 7257507
Author: Robert Bradshaw <ro...@google.com>
Authored: Mon Jul 17 14:33:01 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 17 14:33:01 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_runner.py           | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------



[2/2] beam git commit: Improving labeling of side inputs for Dataflow

Posted by ro...@apache.org.
Improving labeling of side inputs for Dataflow


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7257507d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7257507d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7257507d

Branch: refs/heads/master
Commit: 7257507d939271a91287837c20fcdde37dc1ddeb
Parents: 7e4719c
Author: Pablo <pa...@google.com>
Authored: Fri Jul 7 13:49:47 2017 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Mon Jul 17 14:33:01 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_runner.py           | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7257507d/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 059e139..89c18d4 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -21,6 +21,7 @@ The runner will create a JSON description of the job graph and then submit it
 to the Dataflow Service for remote execution by a worker.
 """
 
+from collections import defaultdict
 import logging
 import threading
 import time
@@ -485,11 +486,24 @@ class DataflowRunner(PipelineRunner):
     si_dict = {}
     # We must call self._cache.get_pvalue exactly once due to refcounting.
     si_labels = {}
+    full_label_counts = defaultdict(int)
     lookup_label = lambda side_pval: si_labels[side_pval]
     for side_pval in transform_node.side_inputs:
       assert isinstance(side_pval, AsSideInput)
-      si_label = 'SideInput-' + self._get_unique_step_name()
-      si_full_label = '%s/%s' % (transform_node.full_label, si_label)
+      step_number = self._get_unique_step_name()
+      si_label = 'SideInput-' + step_number
+      pcollection_label = '%s.%s' % (
+          side_pval.pvalue.producer.full_label.split('/')[-1],
+          side_pval.pvalue.tag if side_pval.pvalue.tag else 'out')
+      si_full_label = '%s/%s(%s.%s)' % (transform_node.full_label,
+                                        side_pval.__class__.__name__,
+                                        pcollection_label,
+                                        full_label_counts[pcollection_label])
+
+      # Count the number of times the same PCollection is a side input
+      # to the same ParDo.
+      full_label_counts[pcollection_label] += 1
+
       self._add_singleton_step(
           si_label, si_full_label, side_pval.pvalue.tag,
           self._cache.get_pvalue(side_pval.pvalue))