You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/06/01 22:16:39 UTC

[beam] branch master updated: [BEAM-10158] Use a shared unbounded thread pool within Beam Python. (#11867)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 89fc35b  [BEAM-10158] Use a shared unbounded thread pool within Beam Python. (#11867)
89fc35b is described below

commit 89fc35b87d5dc074d25d60a97bb96d71e04be283
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Mon Jun 1 15:16:19 2020 -0700

    [BEAM-10158] Use a shared unbounded thread pool within Beam Python. (#11867)
    
    * [BEAM-10158] Use a shared unbounded thread pool within Beam Python.
    
    * fixup! Fix lint errors
    
    * fixup! Address PR comments
    
    * fixup! fix lint
---
 .../runners/portability/artifact_service_test.py          |  6 +++---
 .../runners/portability/expansion_service_test.py         |  4 ++--
 .../runners/portability/fn_api_runner/fn_runner.py        |  4 ++--
 .../runners/portability/fn_api_runner/worker_handlers.py  | 14 +++++++++-----
 .../apache_beam/runners/portability/local_job_service.py  |  7 ++++---
 .../runners/portability/portable_stager_test.py           |  4 ++--
 sdks/python/apache_beam/runners/worker/data_plane_test.py |  4 ++--
 .../python/apache_beam/runners/worker/log_handler_test.py |  4 ++--
 sdks/python/apache_beam/runners/worker/sdk_worker.py      |  9 +++++----
 sdks/python/apache_beam/runners/worker/sdk_worker_test.py |  4 ++--
 .../python/apache_beam/runners/worker/worker_pool_main.py |  5 +++--
 .../apache_beam/runners/worker/worker_status_test.py      |  4 ++--
 sdks/python/apache_beam/utils/thread_pool_executor.py     | 13 +++++++++++++
 .../python/apache_beam/utils/thread_pool_executor_test.py | 15 +++++++++++++++
 14 files changed, 66 insertions(+), 31 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/artifact_service_test.py b/sdks/python/apache_beam/runners/portability/artifact_service_test.py
index 14993dc..6a6768a 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service_test.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service_test.py
@@ -44,7 +44,7 @@ from apache_beam.portability.api import beam_artifact_api_pb2_grpc
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners.portability import artifact_service
 from apache_beam.utils import proto_utils
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
 
 
 class AbstractArtifactServiceTest(unittest.TestCase):
@@ -83,7 +83,7 @@ class AbstractArtifactServiceTest(unittest.TestCase):
     self._run_staging(self._service, self._service)
 
   def test_with_grpc(self):
-    server = grpc.server(UnboundedThreadPoolExecutor())
+    server = grpc.server(thread_pool_executor.shared_unbounded_instance())
     try:
       beam_artifact_api_pb2_grpc.add_LegacyArtifactStagingServiceServicer_to_server(
           self._service, server)
@@ -225,7 +225,7 @@ class AbstractArtifactServiceTest(unittest.TestCase):
               self._service, tokens[session(index)], name(index)))
 
     # pylint: disable=range-builtin-not-iterating
-    pool = UnboundedThreadPoolExecutor()
+    pool = thread_pool_executor.shared_unbounded_instance()
     sessions = set(pool.map(put, range(100)))
     tokens = dict(pool.map(commit, sessions))
     # List forces materialization.
diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_test.py b/sdks/python/apache_beam/runners/portability/expansion_service_test.py
index 6a7ab57..a1b531d 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service_test.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service_test.py
@@ -36,7 +36,7 @@ from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigur
 from apache_beam.runners.portability import expansion_service
 from apache_beam.transforms import ptransform
 from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
 
 # This script provides an expansion service and example ptransforms for running
 # external transform test cases. See external_test.py for details.
@@ -296,7 +296,7 @@ def main(unused_argv):
       '-p', '--port', type=int, help='port on which to serve the job api')
   options = parser.parse_args()
   global server
-  server = grpc.server(UnboundedThreadPoolExecutor())
+  server = grpc.server(thread_pool_executor.shared_unbounded_instance())
   beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
       expansion_service.ExpansionServiceServicer(
           PipelineOptions(
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 58db57c..c2147ce 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -64,7 +64,7 @@ from apache_beam.runners.portability.fn_api_runner.worker_handlers import Worker
 from apache_beam.transforms import environments
 from apache_beam.utils import profiler
 from apache_beam.utils import proto_utils
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
 
 if TYPE_CHECKING:
   from apache_beam.pipeline import Pipeline
@@ -925,7 +925,7 @@ class ParallelBundleManager(BundleManager):
           expected_output_timers,
           dry_run)
 
-    with UnboundedThreadPoolExecutor() as executor:
+    with thread_pool_executor.shared_unbounded_instance() as executor:
       for result, split_result in executor.map(execute, zip(part_inputs,  # pylint: disable=zip-builtin-not-iterating
                                                             timer_inputs)):
         split_result_list += split_result
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index 6808b47..6a27656 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -63,7 +63,7 @@ from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
 from apache_beam.runners.worker.sdk_worker import _Future
 from apache_beam.runners.worker.statecache import StateCache
 from apache_beam.utils import proto_utils
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
 
 # State caching is enabled in the fn_api_runner for testing, except for one
 # test which runs without state caching (FnApiRunnerTestWithDisabledCaching).
@@ -441,7 +441,8 @@ class GrpcServer(object):
     # type: (...) -> None
     self.state = state
     self.provision_info = provision_info
-    self.control_server = grpc.server(UnboundedThreadPoolExecutor())
+    self.control_server = grpc.server(
+        thread_pool_executor.shared_unbounded_instance())
     self.control_port = self.control_server.add_insecure_port('[::]:0')
     self.control_address = 'localhost:%s' % self.control_port
 
@@ -451,11 +452,13 @@ class GrpcServer(object):
     no_max_message_sizes = [("grpc.max_receive_message_length", -1),
                             ("grpc.max_send_message_length", -1)]
     self.data_server = grpc.server(
-        UnboundedThreadPoolExecutor(), options=no_max_message_sizes)
+        thread_pool_executor.shared_unbounded_instance(),
+        options=no_max_message_sizes)
     self.data_port = self.data_server.add_insecure_port('[::]:0')
 
     self.state_server = grpc.server(
-        UnboundedThreadPoolExecutor(), options=no_max_message_sizes)
+        thread_pool_executor.shared_unbounded_instance(),
+        options=no_max_message_sizes)
     self.state_port = self.state_server.add_insecure_port('[::]:0')
 
     self.control_handler = BeamFnControlServicer(worker_manager)
@@ -493,7 +496,8 @@ class GrpcServer(object):
         GrpcStateServicer(state), self.state_server)
 
     self.logging_server = grpc.server(
-        UnboundedThreadPoolExecutor(), options=no_max_message_sizes)
+        thread_pool_executor.shared_unbounded_instance(),
+        options=no_max_message_sizes)
     self.logging_port = self.logging_server.add_insecure_port('[::]:0')
     beam_fn_api_pb2_grpc.add_BeamFnLoggingServicer_to_server(
         BasicLoggingService(), self.logging_server)
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py
index b99bfeb..dcdc442 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -48,7 +48,7 @@ from apache_beam.runners.portability import abstract_job_service
 from apache_beam.runners.portability import artifact_service
 from apache_beam.runners.portability.fn_api_runner import fn_runner
 from apache_beam.runners.portability.fn_api_runner import worker_handlers
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
 
 if TYPE_CHECKING:
   from google.protobuf import struct_pb2  # pylint: disable=ungrouped-imports
@@ -141,7 +141,7 @@ class LocalJobServicer(abstract_job_service.AbstractJobServiceServicer):
     return 'localhost'
 
   def start_grpc_server(self, port=0):
-    self._server = grpc.server(UnboundedThreadPoolExecutor())
+    self._server = grpc.server(thread_pool_executor.shared_unbounded_instance())
     port = self._server.add_insecure_port(
         '%s:%d' % (self.get_bind_address(), port))
     beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(self, self._server)
@@ -194,7 +194,8 @@ class SubprocessSdkWorker(object):
     self._worker_id = worker_id
 
   def run(self):
-    logging_server = grpc.server(UnboundedThreadPoolExecutor())
+    logging_server = grpc.server(
+        thread_pool_executor.shared_unbounded_instance())
     logging_port = logging_server.add_insecure_port('[::]:0')
     logging_server.start()
     logging_servicer = BeamFnLoggingServicer()
diff --git a/sdks/python/apache_beam/runners/portability/portable_stager_test.py b/sdks/python/apache_beam/runners/portability/portable_stager_test.py
index c7a8e1c..44fb498 100644
--- a/sdks/python/apache_beam/runners/portability/portable_stager_test.py
+++ b/sdks/python/apache_beam/runners/portability/portable_stager_test.py
@@ -36,7 +36,7 @@ import grpc
 from apache_beam.portability.api import beam_artifact_api_pb2
 from apache_beam.portability.api import beam_artifact_api_pb2_grpc
 from apache_beam.runners.portability import portable_stager
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
 
 
 class PortableStagerTest(unittest.TestCase):
@@ -58,7 +58,7 @@ class PortableStagerTest(unittest.TestCase):
           describing the name of the artifacts in local temp folder and desired
           name in staging location.
     """
-    server = grpc.server(UnboundedThreadPoolExecutor())
+    server = grpc.server(thread_pool_executor.shared_unbounded_instance())
     staging_service = TestLocalFileSystemLegacyArtifactStagingServiceServicer(
         self._remote_dir)
     beam_artifact_api_pb2_grpc.add_LegacyArtifactStagingServiceServicer_to_server(
diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py b/sdks/python/apache_beam/runners/worker/data_plane_test.py
index a4922a5..4f09df3 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane_test.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py
@@ -34,7 +34,7 @@ from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.runners.worker import data_plane
 from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
 from apache_beam.testing.util import timeout
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
 
 
 class DataChannelTest(unittest.TestCase):
@@ -56,7 +56,7 @@ class DataChannelTest(unittest.TestCase):
     data_channel_service = \
       data_servicer.get_conn_by_worker_id(worker_id)
 
-    server = grpc.server(UnboundedThreadPoolExecutor())
+    server = grpc.server(thread_pool_executor.shared_unbounded_instance())
     beam_fn_api_pb2_grpc.add_BeamFnDataServicer_to_server(data_servicer, server)
     test_port = server.add_insecure_port('[::]:0')
     server.start()
diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py b/sdks/python/apache_beam/runners/worker/log_handler_test.py
index 4d9291d..80fe543 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler_test.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py
@@ -32,7 +32,7 @@ from apache_beam.portability.api import endpoints_pb2
 from apache_beam.runners.common import NameContext
 from apache_beam.runners.worker import log_handler
 from apache_beam.runners.worker import statesampler
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -52,7 +52,7 @@ class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
 class FnApiLogRecordHandlerTest(unittest.TestCase):
   def setUp(self):
     self.test_logging_service = BeamFnLoggingServicer()
-    self.server = grpc.server(UnboundedThreadPoolExecutor())
+    self.server = grpc.server(thread_pool_executor.shared_unbounded_instance())
     beam_fn_api_pb2_grpc.add_BeamFnLoggingServicer_to_server(
         self.test_logging_service, self.server)
     self.test_port = self.server.add_insecure_port('[::]:0')
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index eed471d..a58dd2f 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -64,7 +64,7 @@ from apache_beam.runners.worker.data_plane import PeriodicThread
 from apache_beam.runners.worker.statecache import StateCache
 from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
 from apache_beam.runners.worker.worker_status import FnApiWorkerStatusHandler
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
 
 if TYPE_CHECKING:
   from apache_beam.portability.api import endpoints_pb2
@@ -186,10 +186,11 @@ class SdkHarness(object):
     else:
       self._status_handler = None
 
-    # TODO(BEAM-8998) use common UnboundedThreadPoolExecutor to process bundle
-    #  progress once dataflow runner's excessive progress polling is removed.
+    # TODO(BEAM-8998) use common
+    # thread_pool_executor.shared_unbounded_instance() to process bundle
+    # progress once dataflow runner's excessive progress polling is removed.
     self._report_progress_executor = futures.ThreadPoolExecutor(max_workers=1)
-    self._worker_thread_pool = UnboundedThreadPoolExecutor()
+    self._worker_thread_pool = thread_pool_executor.shared_unbounded_instance()
     self._responses = queue.Queue(
     )  # type: queue.Queue[beam_fn_api_pb2.InstructionResponse]
     _LOGGER.info('Initializing SDKHarness with unbounded number of workers.')
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index a7c3bc3..d9b8083 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -38,7 +38,7 @@ from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.portability.api import metrics_pb2
 from apache_beam.runners.worker import sdk_worker
 from apache_beam.runners.worker import statecache
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -102,7 +102,7 @@ class SdkWorkerTest(unittest.TestCase):
 
       test_controller = BeamFnControlServicer(requests)
 
-      server = grpc.server(UnboundedThreadPoolExecutor())
+      server = grpc.server(thread_pool_executor.shared_unbounded_instance())
       beam_fn_api_pb2_grpc.add_BeamFnControlServicer_to_server(
           test_controller, server)
       test_port = server.add_insecure_port("[::]:0")
diff --git a/sdks/python/apache_beam/runners/worker/worker_pool_main.py b/sdks/python/apache_beam/runners/worker/worker_pool_main.py
index b18961a..f4a4728 100644
--- a/sdks/python/apache_beam/runners/worker/worker_pool_main.py
+++ b/sdks/python/apache_beam/runners/worker/worker_pool_main.py
@@ -48,7 +48,7 @@ import grpc
 from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.runners.worker import sdk_worker
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
 
 _LOGGER = logging.getLogger(__name__)
 
@@ -78,7 +78,8 @@ class BeamFnExternalWorkerPoolServicer(
       container_executable=None  # type: Optional[str]
   ):
     # type: (...) -> Tuple[str, grpc.Server]
-    worker_server = grpc.server(UnboundedThreadPoolExecutor())
+    worker_server = grpc.server(
+        thread_pool_executor.shared_unbounded_instance())
     worker_address = 'localhost:%s' % worker_server.add_insecure_port(
         '[::]:%s' % port)
     worker_pool = cls(
diff --git a/sdks/python/apache_beam/runners/worker/worker_status_test.py b/sdks/python/apache_beam/runners/worker/worker_status_test.py
index 0872de4..fea0d73 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status_test.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status_test.py
@@ -28,7 +28,7 @@ from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.runners.worker.worker_status import FnApiWorkerStatusHandler
 from apache_beam.testing.util import timeout
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
 
 
 class BeamFnStatusServicer(beam_fn_api_pb2_grpc.BeamFnWorkerStatusServicer):
@@ -52,7 +52,7 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase):
   def setUp(self):
     self.num_request = 3
     self.test_status_service = BeamFnStatusServicer(self.num_request)
-    self.server = grpc.server(UnboundedThreadPoolExecutor())
+    self.server = grpc.server(thread_pool_executor.shared_unbounded_instance())
     beam_fn_api_pb2_grpc.add_BeamFnWorkerStatusServicer_to_server(
         self.test_status_service, self.server)
     self.test_port = self.server.add_insecure_port('[::]:0')
diff --git a/sdks/python/apache_beam/utils/thread_pool_executor.py b/sdks/python/apache_beam/utils/thread_pool_executor.py
index a5443ed..c111b9a 100644
--- a/sdks/python/apache_beam/utils/thread_pool_executor.py
+++ b/sdks/python/apache_beam/utils/thread_pool_executor.py
@@ -134,3 +134,16 @@ class UnboundedThreadPoolExecutor(_base.Executor):
       if wait:
         for worker in self._workers:
           worker.join()
+
+
+class _SharedUnboundedThreadPoolExecutor(UnboundedThreadPoolExecutor):
+  def shutdown(self, wait=True):
+    # Prevent shutting down the shared thread pool
+    pass
+
+
+_SHARED_UNBOUNDED_THREAD_POOL_EXECUTOR = _SharedUnboundedThreadPoolExecutor()
+
+
+def shared_unbounded_instance():
+  return _SHARED_UNBOUNDED_THREAD_POOL_EXECUTOR
diff --git a/sdks/python/apache_beam/utils/thread_pool_executor_test.py b/sdks/python/apache_beam/utils/thread_pool_executor_test.py
index d9bbae4..b9251ca 100644
--- a/sdks/python/apache_beam/utils/thread_pool_executor_test.py
+++ b/sdks/python/apache_beam/utils/thread_pool_executor_test.py
@@ -30,6 +30,7 @@ import unittest
 # patches unittest.TestCase to be python3 compatible
 import future.tests.base  # pylint: disable=unused-import
 
+from apache_beam.utils import thread_pool_executor
 from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
 
 
@@ -111,6 +112,20 @@ class UnboundedThreadPoolExecutorTest(unittest.TestCase):
     with self._lock:
       self.assertEqual(5, len(self._worker_idents))
 
+  def test_shared_shutdown_does_nothing(self):
+    thread_pool_executor.shared_unbounded_instance().shutdown()
+
+    futures = []
+    with thread_pool_executor.shared_unbounded_instance() as executor:
+      for _ in range(0, 5):
+        futures.append(executor.submit(self.append_and_sleep, 0.01))
+
+    for future in futures:
+      future.result(timeout=10)
+
+    with self._lock:
+      self.assertEqual(5, len(self._worker_idents))
+
 
 if __name__ == '__main__':
   unittest.main()