You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/14 00:54:19 UTC

[GitHub] [beam] KevinGG opened a new pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

KevinGG opened a new pull request #12249:
URL: https://github.com/apache/beam/pull/12249


   1. Interactive Beam now creates 1 cache manager for each user defined pipeline.
   2. Cache managers are created lazily when they are used.
   3. Cleanup of cache and states are carried out independently for each pipeline.
   4. The source recording/capture size limit is applied to the total capture size of all pipelines.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on a change in pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
KevinGG commented on a change in pull request #12249:
URL: https://github.com/apache/beam/pull/12249#discussion_r454539887



##########
File path: sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -286,24 +298,40 @@ def watching(self):
         watching.append(vars(watchable).items())
     return watching
 
-  def set_cache_manager(self, cache_manager):
-    """Sets the cache manager held by current Interactive Environment."""
-    if self._cache_manager is cache_manager:
+  def set_cache_manager(self, cache_manager, pipeline):
+    """Sets the cache manager held by current Interactive Environment for the
+    given pipeline."""
+    if self.get_cache_manager(pipeline) is cache_manager:
       # NOOP if setting to the same cache_manager.
       return
-    if self._cache_manager:
+    if self.get_cache_manager(pipeline):
       # Invoke cleanup routine when a new cache_manager is forcefully set and
       # current cache_manager is not None.
-      self.cleanup()
-      atexit.unregister(self.cleanup)
-    self._cache_manager = cache_manager
-    if self._cache_manager:
-      # Re-register cleanup routine for the new cache_manager if it's not None.
-      atexit.register(self.cleanup)
-
-  def cache_manager(self):
-    """Gets the cache manager held by current Interactive Environment."""
-    return self._cache_manager
+      self.cleanup(pipeline)
+    self._cache_managers[str(id(pipeline))] = cache_manager
+
+  def get_cache_manager(self, pipeline, create_if_absent=False):
+    """Gets the cache manager held by current Interactive Environment for the
+    given pipeline. If the pipeline is absent from the environment while
+    create_if_absent is True, creates and returns a new file based cache
+    manager for the pipeline."""
+    cache_manager = self._cache_managers.get(str(id(pipeline)), None)
+    if not cache_manager and create_if_absent:
+      cache_dir = tempfile.mkdtemp(
+          suffix=str(id(pipeline)),
+          prefix='interactive-temp-',
+          dir=os.environ.get('TEST_TMPDIR', None))
+      cache_manager = cache.FileBasedCacheManager(cache_dir)
+      self._cache_managers[str(id(pipeline))] = cache_manager
+    return cache_manager
+
+  def evict_cache_manager(self, pipeline=None):
+    """Evicts the cache manager held by current Interactive Environment for the
+    given pipeline. Noop if the pipeline is absent from the environment. If no
+    pipeline is specified, evicts for all pipelines."""

Review comment:
       Thanks! Yes, I'll add a `cleanup` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on a change in pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
KevinGG commented on a change in pull request #12249:
URL: https://github.com/apache/beam/pull/12249#discussion_r454545613



##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
##########
@@ -556,19 +556,26 @@ def _process(self, pcoll):
             if not self._pin._user_pipeline:
               # Retrieve a reference to the user defined pipeline instance.
               self._pin._user_pipeline = user_pcoll.pipeline
-              # Once user_pipeline is retrieved, check if the user pipeline
-              # contains any source to cache. If so, current cache manager held
-              # by current interactive environment might get wrapped into a
-              # streaming cache, thus re-assign the reference to that cache
-              # manager.
+              # Retrieve a reference to the cache manager for the user defined
+              # pipeline instance.
+              self._pin._cache_manager = ie.current_env().get_cache_manager(

Review comment:
       L569 might not get executed at all if the `background_caching_job` does not `has_source_to_cache`. This line makes sure the cache manager is initialized (if never initialized before) in either conditions.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
KevinGG commented on pull request #12249:
URL: https://github.com/apache/beam/pull/12249#issuecomment-658292555


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
KevinGG commented on pull request #12249:
URL: https://github.com/apache/beam/pull/12249#issuecomment-658394934


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
KevinGG commented on pull request #12249:
URL: https://github.com/apache/beam/pull/12249#issuecomment-658371441


   Run Portable_Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on a change in pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #12249:
URL: https://github.com/apache/beam/pull/12249#discussion_r454523384



##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
##########
@@ -556,19 +556,26 @@ def _process(self, pcoll):
             if not self._pin._user_pipeline:
               # Retrieve a reference to the user defined pipeline instance.
               self._pin._user_pipeline = user_pcoll.pipeline
-              # Once user_pipeline is retrieved, check if the user pipeline
-              # contains any source to cache. If so, current cache manager held
-              # by current interactive environment might get wrapped into a
-              # streaming cache, thus re-assign the reference to that cache
-              # manager.
+              # Retrieve a reference to the cache manager for the user defined
+              # pipeline instance.
+              self._pin._cache_manager = ie.current_env().get_cache_manager(

Review comment:
       Do you need this? You can change L569 to use `create_if_absent=True` instead.

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_environment.py
##########
@@ -286,24 +298,40 @@ def watching(self):
         watching.append(vars(watchable).items())
     return watching
 
-  def set_cache_manager(self, cache_manager):
-    """Sets the cache manager held by current Interactive Environment."""
-    if self._cache_manager is cache_manager:
+  def set_cache_manager(self, cache_manager, pipeline):
+    """Sets the cache manager held by current Interactive Environment for the
+    given pipeline."""
+    if self.get_cache_manager(pipeline) is cache_manager:
       # NOOP if setting to the same cache_manager.
       return
-    if self._cache_manager:
+    if self.get_cache_manager(pipeline):
       # Invoke cleanup routine when a new cache_manager is forcefully set and
       # current cache_manager is not None.
-      self.cleanup()
-      atexit.unregister(self.cleanup)
-    self._cache_manager = cache_manager
-    if self._cache_manager:
-      # Re-register cleanup routine for the new cache_manager if it's not None.
-      atexit.register(self.cleanup)
-
-  def cache_manager(self):
-    """Gets the cache manager held by current Interactive Environment."""
-    return self._cache_manager
+      self.cleanup(pipeline)
+    self._cache_managers[str(id(pipeline))] = cache_manager
+
+  def get_cache_manager(self, pipeline, create_if_absent=False):
+    """Gets the cache manager held by current Interactive Environment for the
+    given pipeline. If the pipeline is absent from the environment while
+    create_if_absent is True, creates and returns a new file based cache
+    manager for the pipeline."""
+    cache_manager = self._cache_managers.get(str(id(pipeline)), None)
+    if not cache_manager and create_if_absent:
+      cache_dir = tempfile.mkdtemp(
+          suffix=str(id(pipeline)),
+          prefix='interactive-temp-',
+          dir=os.environ.get('TEST_TMPDIR', None))
+      cache_manager = cache.FileBasedCacheManager(cache_dir)
+      self._cache_managers[str(id(pipeline))] = cache_manager
+    return cache_manager
+
+  def evict_cache_manager(self, pipeline=None):
+    """Evicts the cache manager held by current Interactive Environment for the
+    given pipeline. Noop if the pipeline is absent from the environment. If no
+    pipeline is specified, evicts for all pipelines."""

Review comment:
       Do you need to call `cleanup` for evicted cache manager(s) ?

##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -259,7 +247,8 @@ def read(self, pcoll, include_window_info=False):
     WindowedValues. Otherwise, return the element as itself.
     """
     key = self._pipeline_instrument.cache_key(pcoll)
-    cache_manager = ie.current_env().cache_manager()
+    cache_manager = ie.current_env().get_cache_manager(
+        self._pipeline_instrument.user_pipeline)
     if cache_manager.exists('full', key):

Review comment:
       Do you need to check that `cache_manager` is not `None`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
KevinGG commented on pull request #12249:
URL: https://github.com/apache/beam/pull/12249#issuecomment-658292326


   R: @davidyan74 
   R: @aaltay 
   
   PTAL, thx!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
KevinGG commented on pull request #12249:
URL: https://github.com/apache/beam/pull/12249#issuecomment-658277865


   Run Python2_PVR_Flink PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
KevinGG commented on pull request #12249:
URL: https://github.com/apache/beam/pull/12249#issuecomment-658371339


   Run Python PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay merged pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
aaltay merged pull request #12249:
URL: https://github.com/apache/beam/pull/12249


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on a change in pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
KevinGG commented on a change in pull request #12249:
URL: https://github.com/apache/beam/pull/12249#discussion_r454707919



##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
##########
@@ -556,19 +556,26 @@ def _process(self, pcoll):
             if not self._pin._user_pipeline:
               # Retrieve a reference to the user defined pipeline instance.
               self._pin._user_pipeline = user_pcoll.pipeline
-              # Once user_pipeline is retrieved, check if the user pipeline
-              # contains any source to cache. If so, current cache manager held
-              # by current interactive environment might get wrapped into a
-              # streaming cache, thus re-assign the reference to that cache
-              # manager.
+              # Retrieve a reference to the cache manager for the user defined
+              # pipeline instance.
+              self._pin._cache_manager = ie.current_env().get_cache_manager(

Review comment:
       This line is the "official" way within the module to initialize the cache manager.
   (Outside the module, each time a user appends a PTransform, a cache manager will be created_if_absent for the user defined pipeline)
   
   TL;DR: L577 is only a fallback when the user defined pipeline cannot be identified from the given pipeline. It might not get executed either.
   
   There are 2 scenarios that the user defined pipeline can not be identified:
   
   - The given pipeline is empty without any PCollection in it.
   - The given pipeline is the user defined pipeline (would not happen internally except tests).
   
   In either scenarios, use the given pipeline as the user pipeline as a fallback last resort.
   
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on a change in pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
KevinGG commented on a change in pull request #12249:
URL: https://github.com/apache/beam/pull/12249#discussion_r454543220



##########
File path: sdks/python/apache_beam/runners/interactive/interactive_runner.py
##########
@@ -259,7 +247,8 @@ def read(self, pcoll, include_window_info=False):
     WindowedValues. Otherwise, return the element as itself.
     """
     key = self._pipeline_instrument.cache_key(pcoll)
-    cache_manager = ie.current_env().cache_manager()
+    cache_manager = ie.current_env().get_cache_manager(
+        self._pipeline_instrument.user_pipeline)
     if cache_manager.exists('full', key):

Review comment:
       That is not needed since PipelineResult takes in the PipelineInstrument in the constructor. The cache manager must have been initialized during the instrumenting.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] KevinGG commented on pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
KevinGG commented on pull request #12249:
URL: https://github.com/apache/beam/pull/12249#issuecomment-658297809


   Reported BEAM-10483


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] aaltay commented on a change in pull request #12249: Make tracking/cleanup of cache and in-environment states per pipeline

Posted by GitBox <gi...@apache.org>.
aaltay commented on a change in pull request #12249:
URL: https://github.com/apache/beam/pull/12249#discussion_r454687033



##########
File path: sdks/python/apache_beam/runners/interactive/pipeline_instrument.py
##########
@@ -556,19 +556,26 @@ def _process(self, pcoll):
             if not self._pin._user_pipeline:
               # Retrieve a reference to the user defined pipeline instance.
               self._pin._user_pipeline = user_pcoll.pipeline
-              # Once user_pipeline is retrieved, check if the user pipeline
-              # contains any source to cache. If so, current cache manager held
-              # by current interactive environment might get wrapped into a
-              # streaming cache, thus re-assign the reference to that cache
-              # manager.
+              # Retrieve a reference to the cache manager for the user defined
+              # pipeline instance.
+              self._pin._cache_manager = ie.current_env().get_cache_manager(

Review comment:
       Does it matter though, because then there will be L577 to initialize? Is there a case `if not self._user_pipeline:` would not be true in the first run?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org