You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ib...@apache.org on 2020/09/09 16:16:05 UTC

[beam] branch master updated: [BEAM-10860] avoid dictionary size change when shutting down BundleProcessorCache

This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new b011a22  [BEAM-10860] avoid dictionary size change when shutting down BundleProcessorCache
     new 1377299  Merge pull request #12783 from lazylynx/avoid_dictionary_size_change_when_BundleProcessorCache_shutdown
b011a22 is described below

commit b011a229e9a0134bcc94b9b75465afa6350e697c
Author: yoshiki.obata <yo...@gmail.com>
AuthorDate: Tue Sep 8 23:16:54 2020 +0900

    [BEAM-10860] avoid dictionary size change when shutting down BundleProcessorCache
---
 sdks/python/apache_beam/runners/worker/sdk_worker.py | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index cf35c61..1a8e093 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -429,9 +429,8 @@ class BundleProcessorCache(object):
       self.periodic_shutdown.join()
       self.periodic_shutdown = None
 
-    for instruction_id in self.active_bundle_processors:
-      self.active_bundle_processors[instruction_id][1].shutdown()
-      del self.active_bundle_processors[instruction_id]
+    for instruction_id in list(self.active_bundle_processors.keys()):
+      self.discard(instruction_id)
     for cached_bundle_processors in self.cached_bundle_processors.values():
       BundleProcessorCache._shutdown_cached_bundle_processors(
           cached_bundle_processors)