You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/14 17:56:40 UTC

[GitHub] [beam] KevinGG commented on a change in pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

KevinGG commented on a change in pull request #12249:
URL: https://github.com/apache/beam/pull/12249#discussion_r454539887



##########
File path: sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -286,24 +298,40 @@ def watching(self):
         watching.append(vars(watchable).items())
     return watching
 
-  def set_cache_manager(self, cache_manager):
-    """Sets the cache manager held by current Interactive Environment."""
-    if self._cache_manager is cache_manager:
+  def set_cache_manager(self, cache_manager, pipeline):
+    """Sets the cache manager held by current Interactive Environment for the
+    given pipeline."""
+    if self.get_cache_manager(pipeline) is cache_manager:
       # NOOP if setting to the same cache_manager.
       return
-    if self._cache_manager:
+    if self.get_cache_manager(pipeline):
       # Invoke cleanup routine when a new cache_manager is forcefully set and
       # current cache_manager is not None.
-      self.cleanup()
-      atexit.unregister(self.cleanup)
-    self._cache_manager = cache_manager
-    if self._cache_manager:
-      # Re-register cleanup routine for the new cache_manager if it's not None.
-      atexit.register(self.cleanup)
-
-  def cache_manager(self):
-    """Gets the cache manager held by current Interactive Environment."""
-    return self._cache_manager
+      self.cleanup(pipeline)
+    self._cache_managers[str(id(pipeline))] = cache_manager
+
+  def get_cache_manager(self, pipeline, create_if_absent=False):
+    """Gets the cache manager held by current Interactive Environment for the
+    given pipeline. If the pipeline is absent from the environment while
+    create_if_absent is True, creates and returns a new file based cache
+    manager for the pipeline."""
+    cache_manager = self._cache_managers.get(str(id(pipeline)), None)
+    if not cache_manager and create_if_absent:
+      cache_dir = tempfile.mkdtemp(
+          suffix=str(id(pipeline)),
+          prefix='interactive-temp-',
+          dir=os.environ.get('TEST_TMPDIR', None))
+      cache_manager = cache.FileBasedCacheManager(cache_dir)
+      self._cache_managers[str(id(pipeline))] = cache_manager
+    return cache_manager
+
+  def evict_cache_manager(self, pipeline=None):
+    """Evicts the cache manager held by current Interactive Environment for the
+    given pipeline. Noop if the pipeline is absent from the environment. If no
+    pipeline is specified, evicts for all pipelines."""

Review comment:
       Thanks! Yes, I'll add a `cleanup` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org