You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ji...@apache.org on 2020/02/25 09:22:00 UTC

[beam] branch BEAM-8618-PR created (now 4c288d1)

This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a change to branch BEAM-8618-PR
in repository https://gitbox.apache.org/repos/asf/beam.git.


      at 4c288d1  fixup! [BEAM-8618] Tear down unused DoFns periodically in Python SDK harness.

This branch includes the following new commits:

     new 4c288d1  fixup! [BEAM-8618] Tear down unused DoFns periodically in Python SDK harness.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[beam] 01/01: fixup! [BEAM-8618] Tear down unused DoFns periodically in Python SDK harness.

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch BEAM-8618-PR
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 4c288d111de9b6fceb62aee67b3a0f06fce7cfb4
Author: sunjincheng121 <su...@gmail.com>
AuthorDate: Tue Feb 25 17:21:29 2020 +0800

    fixup! [BEAM-8618] Tear down unused DoFns periodically in Python SDK harness.
---
 sdks/python/apache_beam/runners/worker/sdk_worker.py | 7 +++----
 1 file changed, 3 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index 7e33705..186c940 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -289,7 +289,7 @@ class BundleProcessorCache(object):
     }  # type: Dict[str, Tuple[str, bundle_processor.BundleProcessor]]
     self.cached_bundle_processors = collections.defaultdict(
         list)  # type: DefaultDict[str, List[bundle_processor.BundleProcessor]]
-    self.cached_bundle_processors_last_access_time = \
+    self.last_access_times = \
         collections.defaultdict(float)  # type: DefaultDict[str, float]
     self._schedule_periodic_shutdown()
 
@@ -348,7 +348,7 @@ class BundleProcessorCache(object):
     """
     descriptor_id, processor = self.active_bundle_processors.pop(instruction_id)
     processor.reset()
-    self.cached_bundle_processors_last_access_time[descriptor_id] = time.time()
+    self.last_access_times[descriptor_id] = time.time()
     self.cached_bundle_processors[descriptor_id].append(processor)
 
   def shutdown(self):
@@ -369,8 +369,7 @@ class BundleProcessorCache(object):
 
   def _schedule_periodic_shutdown(self):
     def shutdown_inactive_bundle_processors():
-      items = self.cached_bundle_processors_last_access_time.items()
-      for descriptor_id, last_access_time in items:
+      for descriptor_id, last_access_time in self.last_access_times.items():
         if (time.time() - last_access_time >
             DEFAULT_BUNDLE_PROCESSOR_CACHE_SHUTDOWN_THRESHOLD_S):
           BundleProcessorCache._shutdown_cached_bundle_processors(