You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/08/14 01:23:10 UTC
[beam] branch master updated: Fix bug that evicting computed
PCollections was changing list while iterating.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 29cd482 Fix bug that evicting computed PCollections was changing list while iterating.
new 9ab86d6 Merge pull request #12558 from [BEAM-10695] Fix InteractiveEnvironment cannot evict PCollection
29cd482 is described below
commit 29cd48280172ad332f107eda699813f960bcb1d3
Author: Sam Rohde <sr...@google.com>
AuthorDate: Wed Aug 12 14:19:14 2020 -0700
Fix bug that evicting computed PCollections was changing list while iterating.
Change-Id: I271efeef53f99c8083a6d37b89085cddd63bf56a
---
.../runners/interactive/interactive_environment.py | 4 +++-
.../interactive/interactive_environment_test.py | 20 ++++++++++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index 4363d17..1c69979 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -501,9 +501,11 @@ class InteractiveEnvironment(object):
is specified, evicts for all pipelines.
"""
if pipeline:
+ discarded = set()
for pcoll in self._computed_pcolls:
if pcoll.pipeline is pipeline:
- self._computed_pcolls.discard(pcoll)
+ discarded.add(pcoll)
+ self._computed_pcolls -= discarded
else:
self._computed_pcolls = set()
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
index 6650c63..2360aa8 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
@@ -265,6 +265,26 @@ class InteractiveEnvironmentTest(unittest.TestCase):
ie.current_env().track_user_pipelines()
mocked_cleanup.assert_called_once()
+ def test_evict_pcollections(self):
+ """Tests the evicton logic in the InteractiveEnvironment."""
+
+ # Create two PCollection, one that will be evicted and another that won't.
+ p_to_evict = beam.Pipeline()
+ to_evict = p_to_evict | beam.Create([])
+
+ p_not_evicted = beam.Pipeline()
+ not_evicted = p_not_evicted | beam.Create([])
+
+ # Mark the PCollections as computed because the eviction logic only works
+ # on computed PCollections.
+ ie.current_env().mark_pcollection_computed([to_evict, not_evicted])
+ self.assertSetEqual(
+ ie.current_env().computed_pcollections, {to_evict, not_evicted})
+
+ # Evict the PCollection and then check that the other PCollection is safe.
+ ie.current_env().evict_computed_pcollections(p_to_evict)
+ self.assertSetEqual(ie.current_env().computed_pcollections, {not_evicted})
+
if __name__ == '__main__':
unittest.main()