You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/09/09 16:16:05 UTC
[beam] branch master updated: [BEAM-10860] avoid dictionary size
change when shutting down BundleProcessorCache
This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b011a22 [BEAM-10860] avoid dictionary size change when shutting down BundleProcessorCache
new 1377299 Merge pull request #12783 from lazylynx/avoid_dictionary_size_change_when_BundleProcessorCache_shutdown
b011a22 is described below
commit b011a229e9a0134bcc94b9b75465afa6350e697c
Author: yoshiki.obata <yo...@gmail.com>
AuthorDate: Tue Sep 8 23:16:54 2020 +0900
[BEAM-10860] avoid dictionary size change when shutting down BundleProcessorCache
---
sdks/python/apache_beam/runners/worker/sdk_worker.py | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index cf35c61..1a8e093 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -429,9 +429,8 @@ class BundleProcessorCache(object):
self.periodic_shutdown.join()
self.periodic_shutdown = None
- for instruction_id in self.active_bundle_processors:
- self.active_bundle_processors[instruction_id][1].shutdown()
- del self.active_bundle_processors[instruction_id]
+ for instruction_id in list(self.active_bundle_processors.keys()):
+ self.discard(instruction_id)
for cached_bundle_processors in self.cached_bundle_processors.values():
BundleProcessorCache._shutdown_cached_bundle_processors(
cached_bundle_processors)