You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pa...@apache.org on 2020/08/12 21:30:44 UTC
[beam] branch master updated: Fix BCJ to stop caching when the
cache signature has changed.
This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 3a0f616 Fix BCJ to stop caching when the cache signature has changed.
new 919ade7 Merge pull request #12414 from [BEAM-10603] Fix BCJ to stop caching when the cache signature has changed.
3a0f616 is described below
commit 3a0f6160cc789061af537ac2522959fc8dd1e187
Author: Sam Rohde <sr...@google.com>
AuthorDate: Mon Jul 27 15:38:51 2020 -0700
Fix BCJ to stop caching when the cache signature has changed.
Change-Id: I52a4f36b09e6f6899d7c59756c9702ba983e083b
---
sdks/python/apache_beam/runners/interactive/background_caching_job.py | 4 +++-
.../apache_beam/runners/interactive/background_caching_job_test.py | 3 +++
2 files changed, 6 insertions(+), 1 deletion(-)
diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job.py b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
index 1b05285..9346adb 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job.py
@@ -182,7 +182,9 @@ def is_cache_complete(pipeline_id):
cache_changed = is_source_to_cache_changed(
user_pipeline, update_cached_source_signature=False)
- return is_done and not cache_changed
+ # Stop reading from the cache if the background job is done or the underlying
+ # cache signature changed that requires a new background caching job.
+ return is_done or cache_changed
def has_source_to_cache(user_pipeline):
diff --git a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
index 803f6ce..6848912 100644
--- a/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
+++ b/sdks/python/apache_beam/runners/interactive/background_caching_job_test.py
@@ -197,11 +197,14 @@ class BackgroundCachingJobTest(unittest.TestCase):
ie.current_env().set_cached_source_signature(
pipeline, bcj.extract_source_to_cache_signature(pipeline))
+ self.assertFalse(bcj.is_cache_complete(str(id(pipeline))))
+
with cell: # Cell 2
read_bar = pipeline | 'Read' >> beam.io.ReadFromPubSub(
subscription=_BAR_PUBSUB_SUB)
ib.watch({'read_bar': read_bar})
+ self.assertTrue(bcj.is_cache_complete(str(id(pipeline))))
self.assertTrue(bcj.is_source_to_cache_changed(pipeline))
@patch('IPython.get_ipython', new_callable=mock_get_ipython)