You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/08/12 21:30:44 UTC

[beam] branch master updated: Fix BCJ to stop caching when the cache signature has changed.

This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a0f616  Fix BCJ to stop caching when the cache signature has changed.
     new 919ade7  Merge pull request #12414 from [BEAM-10603] Fix BCJ to stop caching when the cache signature has changed.
3a0f616 is described below

commit 3a0f6160cc789061af537ac2522959fc8dd1e187
Author: Sam Rohde <sr...@google.com>
AuthorDate: Mon Jul 27 15:38:51 2020 -0700

    Fix BCJ to stop caching when the cache signature has changed.
    
    Change-Id: I52a4f36b09e6f6899d7c59756c9702ba983e083b
---
 sdks/python/apache_beam/runners/interactive/background_caching_job.py | 4 +++-
 .../apache_beam/runners/interactive/background_caching_job_test.py    | 3 +++
 2 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job.py b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
index 1b05285..9346adb 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
@@ -182,7 +182,9 @@ def is_cache_complete(pipeline_id):
   cache_changed = is_source_to_cache_changed(
       user_pipeline, update_cached_source_signature=False)
 
-  return is_done and not cache_changed
+  # Stop reading from the cache if the background job is done or the underlying
+  # cache signature changed that requires a new background caching job.
+  return is_done or cache_changed
 
 
 def has_source_to_cache(user_pipeline):
diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
index 803f6ce..6848912 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
@@ -197,11 +197,14 @@ class BackgroundCachingJobTest(unittest.TestCase):
     ie.current_env().set_cached_source_signature(
         pipeline, bcj.extract_source_to_cache_signature(pipeline))
 
+    self.assertFalse(bcj.is_cache_complete(str(id(pipeline))))
+
     with cell:  # Cell 2
       read_bar = pipeline | 'Read' >> beam.io.ReadFromPubSub(
           subscription=_BAR_PUBSUB_SUB)
       ib.watch({'read_bar': read_bar})
 
+    self.assertTrue(bcj.is_cache_complete(str(id(pipeline))))
     self.assertTrue(bcj.is_source_to_cache_changed(pipeline))
 
   @patch('IPython.get_ipython', new_callable=mock_get_ipython)