You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/06/01 22:16:39 UTC
[beam] branch master updated: [BEAM-10158] Use a shared unbounded
thread pool within Beam Python. (#11867)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 89fc35b [BEAM-10158] Use a shared unbounded thread pool within Beam Python. (#11867)
89fc35b is described below
commit 89fc35b87d5dc074d25d60a97bb96d71e04be283
Author: Lukasz Cwik <lu...@gmail.com>
AuthorDate: Mon Jun 1 15:16:19 2020 -0700
[BEAM-10158] Use a shared unbounded thread pool within Beam Python. (#11867)
* [BEAM-10158] Use a shared unbounded thread pool within Beam Python.
* fixup! Fix lint errors
* fixup! Address PR comments
* fixup! fix lint
---
.../runners/portability/artifact_service_test.py | 6 +++---
.../runners/portability/expansion_service_test.py | 4 ++--
.../runners/portability/fn_api_runner/fn_runner.py | 4 ++--
.../runners/portability/fn_api_runner/worker_handlers.py | 14 +++++++++-----
.../apache_beam/runners/portability/local_job_service.py | 7 ++++---
.../runners/portability/portable_stager_test.py | 4 ++--
sdks/python/apache_beam/runners/worker/data_plane_test.py | 4 ++--
.../python/apache_beam/runners/worker/log_handler_test.py | 4 ++--
sdks/python/apache_beam/runners/worker/sdk_worker.py | 9 +++++----
sdks/python/apache_beam/runners/worker/sdk_worker_test.py | 4 ++--
.../python/apache_beam/runners/worker/worker_pool_main.py | 5 +++--
.../apache_beam/runners/worker/worker_status_test.py | 4 ++--
sdks/python/apache_beam/utils/thread_pool_executor.py | 13 +++++++++++++
.../python/apache_beam/utils/thread_pool_executor_test.py | 15 +++++++++++++++
14 files changed, 66 insertions(+), 31 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/artifact_service_test.py b/sdks/python/apache_beam/runners/portability/artifact_service_test.py
index 14993dc..6a6768a 100644
--- a/sdks/python/apache_beam/runners/portability/artifact_service_test.py
+++ b/sdks/python/apache_beam/runners/portability/artifact_service_test.py
@@ -44,7 +44,7 @@ from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.portability import artifact_service
from apache_beam.utils import proto_utils
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
class AbstractArtifactServiceTest(unittest.TestCase):
@@ -83,7 +83,7 @@ class AbstractArtifactServiceTest(unittest.TestCase):
self._run_staging(self._service, self._service)
def test_with_grpc(self):
- server = grpc.server(UnboundedThreadPoolExecutor())
+ server = grpc.server(thread_pool_executor.shared_unbounded_instance())
try:
beam_artifact_api_pb2_grpc.add_LegacyArtifactStagingServiceServicer_to_server(
self._service, server)
@@ -225,7 +225,7 @@ class AbstractArtifactServiceTest(unittest.TestCase):
self._service, tokens[session(index)], name(index)))
# pylint: disable=range-builtin-not-iterating
- pool = UnboundedThreadPoolExecutor()
+ pool = thread_pool_executor.shared_unbounded_instance()
sessions = set(pool.map(put, range(100)))
tokens = dict(pool.map(commit, sessions))
# List forces materialization.
diff --git a/sdks/python/apache_beam/runners/portability/expansion_service_test.py b/sdks/python/apache_beam/runners/portability/expansion_service_test.py
index 6a7ab57..a1b531d 100644
--- a/sdks/python/apache_beam/runners/portability/expansion_service_test.py
+++ b/sdks/python/apache_beam/runners/portability/expansion_service_test.py
@@ -36,7 +36,7 @@ from apache_beam.portability.api.external_transforms_pb2 import ExternalConfigur
from apache_beam.runners.portability import expansion_service
from apache_beam.transforms import ptransform
from apache_beam.transforms.external import ImplicitSchemaPayloadBuilder
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
# This script provides an expansion service and example ptransforms for running
# external transform test cases. See external_test.py for details.
@@ -296,7 +296,7 @@ def main(unused_argv):
'-p', '--port', type=int, help='port on which to serve the job api')
options = parser.parse_args()
global server
- server = grpc.server(UnboundedThreadPoolExecutor())
+ server = grpc.server(thread_pool_executor.shared_unbounded_instance())
beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
expansion_service.ExpansionServiceServicer(
PipelineOptions(
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 58db57c..c2147ce 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -64,7 +64,7 @@ from apache_beam.runners.portability.fn_api_runner.worker_handlers import Worker
from apache_beam.transforms import environments
from apache_beam.utils import profiler
from apache_beam.utils import proto_utils
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
if TYPE_CHECKING:
from apache_beam.pipeline import Pipeline
@@ -925,7 +925,7 @@ class ParallelBundleManager(BundleManager):
expected_output_timers,
dry_run)
- with UnboundedThreadPoolExecutor() as executor:
+ with thread_pool_executor.shared_unbounded_instance() as executor:
for result, split_result in executor.map(execute, zip(part_inputs, # pylint: disable=zip-builtin-not-iterating
timer_inputs)):
split_result_list += split_result
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
index 6808b47..6a27656 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py
@@ -63,7 +63,7 @@ from apache_beam.runners.worker.channel_factory import GRPCChannelFactory
from apache_beam.runners.worker.sdk_worker import _Future
from apache_beam.runners.worker.statecache import StateCache
from apache_beam.utils import proto_utils
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
# State caching is enabled in the fn_api_runner for testing, except for one
# test which runs without state caching (FnApiRunnerTestWithDisabledCaching).
@@ -441,7 +441,8 @@ class GrpcServer(object):
# type: (...) -> None
self.state = state
self.provision_info = provision_info
- self.control_server = grpc.server(UnboundedThreadPoolExecutor())
+ self.control_server = grpc.server(
+ thread_pool_executor.shared_unbounded_instance())
self.control_port = self.control_server.add_insecure_port('[::]:0')
self.control_address = 'localhost:%s' % self.control_port
@@ -451,11 +452,13 @@ class GrpcServer(object):
no_max_message_sizes = [("grpc.max_receive_message_length", -1),
("grpc.max_send_message_length", -1)]
self.data_server = grpc.server(
- UnboundedThreadPoolExecutor(), options=no_max_message_sizes)
+ thread_pool_executor.shared_unbounded_instance(),
+ options=no_max_message_sizes)
self.data_port = self.data_server.add_insecure_port('[::]:0')
self.state_server = grpc.server(
- UnboundedThreadPoolExecutor(), options=no_max_message_sizes)
+ thread_pool_executor.shared_unbounded_instance(),
+ options=no_max_message_sizes)
self.state_port = self.state_server.add_insecure_port('[::]:0')
self.control_handler = BeamFnControlServicer(worker_manager)
@@ -493,7 +496,8 @@ class GrpcServer(object):
GrpcStateServicer(state), self.state_server)
self.logging_server = grpc.server(
- UnboundedThreadPoolExecutor(), options=no_max_message_sizes)
+ thread_pool_executor.shared_unbounded_instance(),
+ options=no_max_message_sizes)
self.logging_port = self.logging_server.add_insecure_port('[::]:0')
beam_fn_api_pb2_grpc.add_BeamFnLoggingServicer_to_server(
BasicLoggingService(), self.logging_server)
diff --git a/sdks/python/apache_beam/runners/portability/local_job_service.py b/sdks/python/apache_beam/runners/portability/local_job_service.py
index b99bfeb..dcdc442 100644
--- a/sdks/python/apache_beam/runners/portability/local_job_service.py
+++ b/sdks/python/apache_beam/runners/portability/local_job_service.py
@@ -48,7 +48,7 @@ from apache_beam.runners.portability import abstract_job_service
from apache_beam.runners.portability import artifact_service
from apache_beam.runners.portability.fn_api_runner import fn_runner
from apache_beam.runners.portability.fn_api_runner import worker_handlers
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
if TYPE_CHECKING:
from google.protobuf import struct_pb2 # pylint: disable=ungrouped-imports
@@ -141,7 +141,7 @@ class LocalJobServicer(abstract_job_service.AbstractJobServiceServicer):
return 'localhost'
def start_grpc_server(self, port=0):
- self._server = grpc.server(UnboundedThreadPoolExecutor())
+ self._server = grpc.server(thread_pool_executor.shared_unbounded_instance())
port = self._server.add_insecure_port(
'%s:%d' % (self.get_bind_address(), port))
beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(self, self._server)
@@ -194,7 +194,8 @@ class SubprocessSdkWorker(object):
self._worker_id = worker_id
def run(self):
- logging_server = grpc.server(UnboundedThreadPoolExecutor())
+ logging_server = grpc.server(
+ thread_pool_executor.shared_unbounded_instance())
logging_port = logging_server.add_insecure_port('[::]:0')
logging_server.start()
logging_servicer = BeamFnLoggingServicer()
diff --git a/sdks/python/apache_beam/runners/portability/portable_stager_test.py b/sdks/python/apache_beam/runners/portability/portable_stager_test.py
index c7a8e1c..44fb498 100644
--- a/sdks/python/apache_beam/runners/portability/portable_stager_test.py
+++ b/sdks/python/apache_beam/runners/portability/portable_stager_test.py
@@ -36,7 +36,7 @@ import grpc
from apache_beam.portability.api import beam_artifact_api_pb2
from apache_beam.portability.api import beam_artifact_api_pb2_grpc
from apache_beam.runners.portability import portable_stager
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
class PortableStagerTest(unittest.TestCase):
@@ -58,7 +58,7 @@ class PortableStagerTest(unittest.TestCase):
describing the name of the artifacts in local temp folder and desired
name in staging location.
"""
- server = grpc.server(UnboundedThreadPoolExecutor())
+ server = grpc.server(thread_pool_executor.shared_unbounded_instance())
staging_service = TestLocalFileSystemLegacyArtifactStagingServiceServicer(
self._remote_dir)
beam_artifact_api_pb2_grpc.add_LegacyArtifactStagingServiceServicer_to_server(
diff --git a/sdks/python/apache_beam/runners/worker/data_plane_test.py b/sdks/python/apache_beam/runners/worker/data_plane_test.py
index a4922a5..4f09df3 100644
--- a/sdks/python/apache_beam/runners/worker/data_plane_test.py
+++ b/sdks/python/apache_beam/runners/worker/data_plane_test.py
@@ -34,7 +34,7 @@ from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.runners.worker import data_plane
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
from apache_beam.testing.util import timeout
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
class DataChannelTest(unittest.TestCase):
@@ -56,7 +56,7 @@ class DataChannelTest(unittest.TestCase):
data_channel_service = \
data_servicer.get_conn_by_worker_id(worker_id)
- server = grpc.server(UnboundedThreadPoolExecutor())
+ server = grpc.server(thread_pool_executor.shared_unbounded_instance())
beam_fn_api_pb2_grpc.add_BeamFnDataServicer_to_server(data_servicer, server)
test_port = server.add_insecure_port('[::]:0')
server.start()
diff --git a/sdks/python/apache_beam/runners/worker/log_handler_test.py b/sdks/python/apache_beam/runners/worker/log_handler_test.py
index 4d9291d..80fe543 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler_test.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler_test.py
@@ -32,7 +32,7 @@ from apache_beam.portability.api import endpoints_pb2
from apache_beam.runners.common import NameContext
from apache_beam.runners.worker import log_handler
from apache_beam.runners.worker import statesampler
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
_LOGGER = logging.getLogger(__name__)
@@ -52,7 +52,7 @@ class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
class FnApiLogRecordHandlerTest(unittest.TestCase):
def setUp(self):
self.test_logging_service = BeamFnLoggingServicer()
- self.server = grpc.server(UnboundedThreadPoolExecutor())
+ self.server = grpc.server(thread_pool_executor.shared_unbounded_instance())
beam_fn_api_pb2_grpc.add_BeamFnLoggingServicer_to_server(
self.test_logging_service, self.server)
self.test_port = self.server.add_insecure_port('[::]:0')
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker.py b/sdks/python/apache_beam/runners/worker/sdk_worker.py
index eed471d..a58dd2f 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker.py
@@ -64,7 +64,7 @@ from apache_beam.runners.worker.data_plane import PeriodicThread
from apache_beam.runners.worker.statecache import StateCache
from apache_beam.runners.worker.worker_id_interceptor import WorkerIdInterceptor
from apache_beam.runners.worker.worker_status import FnApiWorkerStatusHandler
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
if TYPE_CHECKING:
from apache_beam.portability.api import endpoints_pb2
@@ -186,10 +186,11 @@ class SdkHarness(object):
else:
self._status_handler = None
- # TODO(BEAM-8998) use common UnboundedThreadPoolExecutor to process bundle
- # progress once dataflow runner's excessive progress polling is removed.
+ # TODO(BEAM-8998) use common
+ # thread_pool_executor.shared_unbounded_instance() to process bundle
+ # progress once dataflow runner's excessive progress polling is removed.
self._report_progress_executor = futures.ThreadPoolExecutor(max_workers=1)
- self._worker_thread_pool = UnboundedThreadPoolExecutor()
+ self._worker_thread_pool = thread_pool_executor.shared_unbounded_instance()
self._responses = queue.Queue(
) # type: queue.Queue[beam_fn_api_pb2.InstructionResponse]
_LOGGER.info('Initializing SDKHarness with unbounded number of workers.')
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
index a7c3bc3..d9b8083 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_test.py
@@ -38,7 +38,7 @@ from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability.api import metrics_pb2
from apache_beam.runners.worker import sdk_worker
from apache_beam.runners.worker import statecache
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
_LOGGER = logging.getLogger(__name__)
@@ -102,7 +102,7 @@ class SdkWorkerTest(unittest.TestCase):
test_controller = BeamFnControlServicer(requests)
- server = grpc.server(UnboundedThreadPoolExecutor())
+ server = grpc.server(thread_pool_executor.shared_unbounded_instance())
beam_fn_api_pb2_grpc.add_BeamFnControlServicer_to_server(
test_controller, server)
test_port = server.add_insecure_port("[::]:0")
diff --git a/sdks/python/apache_beam/runners/worker/worker_pool_main.py b/sdks/python/apache_beam/runners/worker/worker_pool_main.py
index b18961a..f4a4728 100644
--- a/sdks/python/apache_beam/runners/worker/worker_pool_main.py
+++ b/sdks/python/apache_beam/runners/worker/worker_pool_main.py
@@ -48,7 +48,7 @@ import grpc
from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.runners.worker import sdk_worker
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
_LOGGER = logging.getLogger(__name__)
@@ -78,7 +78,8 @@ class BeamFnExternalWorkerPoolServicer(
container_executable=None # type: Optional[str]
):
# type: (...) -> Tuple[str, grpc.Server]
- worker_server = grpc.server(UnboundedThreadPoolExecutor())
+ worker_server = grpc.server(
+ thread_pool_executor.shared_unbounded_instance())
worker_address = 'localhost:%s' % worker_server.add_insecure_port(
'[::]:%s' % port)
worker_pool = cls(
diff --git a/sdks/python/apache_beam/runners/worker/worker_status_test.py b/sdks/python/apache_beam/runners/worker/worker_status_test.py
index 0872de4..fea0d73 100644
--- a/sdks/python/apache_beam/runners/worker/worker_status_test.py
+++ b/sdks/python/apache_beam/runners/worker/worker_status_test.py
@@ -28,7 +28,7 @@ from apache_beam.portability.api import beam_fn_api_pb2
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.runners.worker.worker_status import FnApiWorkerStatusHandler
from apache_beam.testing.util import timeout
-from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
+from apache_beam.utils import thread_pool_executor
class BeamFnStatusServicer(beam_fn_api_pb2_grpc.BeamFnWorkerStatusServicer):
@@ -52,7 +52,7 @@ class FnApiWorkerStatusHandlerTest(unittest.TestCase):
def setUp(self):
self.num_request = 3
self.test_status_service = BeamFnStatusServicer(self.num_request)
- self.server = grpc.server(UnboundedThreadPoolExecutor())
+ self.server = grpc.server(thread_pool_executor.shared_unbounded_instance())
beam_fn_api_pb2_grpc.add_BeamFnWorkerStatusServicer_to_server(
self.test_status_service, self.server)
self.test_port = self.server.add_insecure_port('[::]:0')
diff --git a/sdks/python/apache_beam/utils/thread_pool_executor.py b/sdks/python/apache_beam/utils/thread_pool_executor.py
index a5443ed..c111b9a 100644
--- a/sdks/python/apache_beam/utils/thread_pool_executor.py
+++ b/sdks/python/apache_beam/utils/thread_pool_executor.py
@@ -134,3 +134,16 @@ class UnboundedThreadPoolExecutor(_base.Executor):
if wait:
for worker in self._workers:
worker.join()
+
+
+class _SharedUnboundedThreadPoolExecutor(UnboundedThreadPoolExecutor):
+ def shutdown(self, wait=True):
+ # Prevent shutting down the shared thread pool
+ pass
+
+
+_SHARED_UNBOUNDED_THREAD_POOL_EXECUTOR = _SharedUnboundedThreadPoolExecutor()
+
+
+def shared_unbounded_instance():
+ return _SHARED_UNBOUNDED_THREAD_POOL_EXECUTOR
diff --git a/sdks/python/apache_beam/utils/thread_pool_executor_test.py b/sdks/python/apache_beam/utils/thread_pool_executor_test.py
index d9bbae4..b9251ca 100644
--- a/sdks/python/apache_beam/utils/thread_pool_executor_test.py
+++ b/sdks/python/apache_beam/utils/thread_pool_executor_test.py
@@ -30,6 +30,7 @@ import unittest
# patches unittest.TestCase to be python3 compatible
import future.tests.base # pylint: disable=unused-import
+from apache_beam.utils import thread_pool_executor
from apache_beam.utils.thread_pool_executor import UnboundedThreadPoolExecutor
@@ -111,6 +112,20 @@ class UnboundedThreadPoolExecutorTest(unittest.TestCase):
with self._lock:
self.assertEqual(5, len(self._worker_idents))
+ def test_shared_shutdown_does_nothing(self):
+ thread_pool_executor.shared_unbounded_instance().shutdown()
+
+ futures = []
+ with thread_pool_executor.shared_unbounded_instance() as executor:
+ for _ in range(0, 5):
+ futures.append(executor.submit(self.append_and_sleep, 0.01))
+
+ for future in futures:
+ future.result(timeout=10)
+
+ with self._lock:
+ self.assertEqual(5, len(self._worker_idents))
+
if __name__ == '__main__':
unittest.main()