You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/12 19:16:14 UTC

[2/2] incubator-beam git commit: Fixes bug due to accessing cached pvalues multiple times.

Fixes bug due to accessing cached pvalues multiple times.

This lookup is not an idempotent operation, as it decrements the
refcount and may even delete the pvalue from the cache.  Instead,
we do the lookup once, storing the result in a map that is
referenced elsewhere.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/246fda51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/246fda51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/246fda51

Branch: refs/heads/python-sdk
Commit: 246fda517fd7d6abdbbd47882657e66c34a4ac51
Parents: 77f90ff
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Jul 12 10:43:29 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Jul 12 12:15:25 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/dataflow_runner.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/246fda51/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 24edb05..5a3f6a5 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -361,7 +361,11 @@ class DataflowPipelineRunner(PipelineRunner):
 
     # Attach side inputs.
     si_dict = {}
-    lookup_label = lambda side_pval: self._cache.get_pvalue(side_pval).step_name
+    # We must call self._cache.get_pvalue exactly once due to refcounting.
+    si_labels = {}
+    for side_pval in transform_node.side_inputs:
+      si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name
+    lookup_label = lambda side_pval: si_labels[side_pval]
     for side_pval in transform_node.side_inputs:
       assert isinstance(side_pval, PCollectionView)
       si_label = lookup_label(side_pval)