You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/08/14 01:23:10 UTC

[beam] branch master updated: Fix bug that evicting computed PCollections was changing list while iterating.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 29cd482  Fix bug that evicting computed PCollections was changing list while iterating.
     new 9ab86d6  Merge pull request #12558 from [BEAM-10695] Fix InteractiveEnvironment cannot evict PCollection
29cd482 is described below

commit 29cd48280172ad332f107eda699813f960bcb1d3
Author: Sam Rohde <sr...@google.com>
AuthorDate: Wed Aug 12 14:19:14 2020 -0700

    Fix bug that evicting computed PCollections was changing list while iterating.
    
    Change-Id: I271efeef53f99c8083a6d37b89085cddd63bf56a
---
 .../runners/interactive/interactive_environment.py   |  4 +++-
 .../interactive/interactive_environment_test.py      | 20 ++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment.py b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
index 4363d17..1c69979 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment.py
@@ -501,9 +501,11 @@ class InteractiveEnvironment(object):
     is specified, evicts for all pipelines.
     """
     if pipeline:
+      discarded = set()
       for pcoll in self._computed_pcolls:
         if pcoll.pipeline is pipeline:
-          self._computed_pcolls.discard(pcoll)
+          discarded.add(pcoll)
+      self._computed_pcolls -= discarded
     else:
       self._computed_pcolls = set()
 
diff --git a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
index 6650c63..2360aa8 100644
--- a/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
+++ b/sdks/python/apache_beam/runners/interactive/interactive_environment_test.py
@@ -265,6 +265,26 @@ class InteractiveEnvironmentTest(unittest.TestCase):
     ie.current_env().track_user_pipelines()
     mocked_cleanup.assert_called_once()
 
+  def test_evict_pcollections(self):
+    """Tests the evicton logic in the InteractiveEnvironment."""
+
+    # Create two PCollection, one that will be evicted and another that won't.
+    p_to_evict = beam.Pipeline()
+    to_evict = p_to_evict | beam.Create([])
+
+    p_not_evicted = beam.Pipeline()
+    not_evicted = p_not_evicted | beam.Create([])
+
+    # Mark the PCollections as computed because the eviction logic only works
+    # on computed PCollections.
+    ie.current_env().mark_pcollection_computed([to_evict, not_evicted])
+    self.assertSetEqual(
+        ie.current_env().computed_pcollections, {to_evict, not_evicted})
+
+    # Evict the PCollection and then check that the other PCollection is safe.
+    ie.current_env().evict_computed_pcollections(p_to_evict)
+    self.assertSetEqual(ie.current_env().computed_pcollections, {not_evicted})
+
 
 if __name__ == '__main__':
   unittest.main()