You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/07/12 19:16:13 UTC

[1/2] incubator-beam git commit: Closes #639

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 77f90ff15 -> 95a591e05


Closes #639


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/95a591e0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/95a591e0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/95a591e0

Branch: refs/heads/python-sdk
Commit: 95a591e053bce7bff09bc06bb468186e3def8479
Parents: 77f90ff 246fda5
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Jul 12 12:15:25 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Jul 12 12:15:25 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/dataflow_runner.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Fixes bug due to accessing cached pvalues multiple times.

Posted by ro...@apache.org.
Fixes bug due to accessing cached pvalues multiple times.

This lookup is not an idempotent operation, as it decrements the
refcount and may even delete the pvalue from the cache.  Instead,
we do the lookup once, storing the result in a map that is
referenced elsewhere.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/246fda51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/246fda51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/246fda51

Branch: refs/heads/python-sdk
Commit: 246fda517fd7d6abdbbd47882657e66c34a4ac51
Parents: 77f90ff
Author: Robert Bradshaw <ro...@google.com>
Authored: Tue Jul 12 10:43:29 2016 -0700
Committer: Robert Bradshaw <ro...@google.com>
Committed: Tue Jul 12 12:15:25 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/runners/dataflow_runner.py | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/246fda51/sdks/python/apache_beam/runners/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py
index 24edb05..5a3f6a5 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -361,7 +361,11 @@ class DataflowPipelineRunner(PipelineRunner):
 
     # Attach side inputs.
     si_dict = {}
-    lookup_label = lambda side_pval: self._cache.get_pvalue(side_pval).step_name
+    # We must call self._cache.get_pvalue exactly once due to refcounting.
+    si_labels = {}
+    for side_pval in transform_node.side_inputs:
+      si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name
+    lookup_label = lambda side_pval: si_labels[side_pval]
     for side_pval in transform_node.side_inputs:
       assert isinstance(side_pval, PCollectionView)
       si_label = lookup_label(side_pval)