You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2019/03/12 10:11:10 UTC
[beam] branch master updated: [BEAM-6754] Add option to use
subprocesses instead of threads in loopback environment (#7984)
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7a0312a [BEAM-6754] Add option to use subprocesses instead of threads in loopback environment (#7984)
7a0312a is described below
commit 7a0312aeec9156175900a34827952f824e265839
Author: Ankur <an...@users.noreply.github.com>
AuthorDate: Tue Mar 12 03:11:02 2019 -0700
[BEAM-6754] Add option to use subprocesses instead of threads in loopback environment (#7984)
---
.../runners/portability/portable_runner.py | 48 ++++++++++++++++------
1 file changed, 35 insertions(+), 13 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index c6f5591..eec5579 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -17,11 +17,13 @@
from __future__ import absolute_import
+import atexit
import functools
import itertools
import json
import logging
import os
+import subprocess
import threading
import time
from concurrent import futures
@@ -168,9 +170,13 @@ class PortableRunner(runner.PipelineRunner):
# This is needed as we start a worker server if one is requested
# but none is provided.
if portable_options.environment_type == 'LOOPBACK':
+ use_loopback_process_worker = options.view_as(
+ DebugOptions).lookup_experiment(
+ 'use_loopback_process_worker', False)
portable_options.environment_config, server = (
BeamFnExternalWorkerPoolServicer.start(
- sdk_worker_main._get_worker_count(options)))
+ sdk_worker_main._get_worker_count(options),
+ use_process=use_loopback_process_worker))
globals()['x'] = server
cleanup_callbacks = [functools.partial(server.stop, 1)]
else:
@@ -439,29 +445,45 @@ class PipelineResult(runner.PipelineResult):
class BeamFnExternalWorkerPoolServicer(
beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
- def __init__(self, worker_threads):
+ def __init__(self, worker_threads, use_process=False):
self._worker_threads = worker_threads
+ self._use_process = use_process
@classmethod
- def start(cls, worker_threads=1):
+ def start(cls, worker_threads=1, use_process=False):
worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
worker_address = 'localhost:%s' % worker_server.add_insecure_port('[::]:0')
beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
- cls(worker_threads), worker_server)
+ cls(worker_threads, use_process=use_process), worker_server)
worker_server.start()
return worker_address, worker_server
def NotifyRunnerAvailable(self, start_worker_request, context):
try:
- worker = sdk_worker.SdkHarness(
- start_worker_request.control_endpoint.url,
- worker_count=self._worker_threads,
- worker_id=start_worker_request.worker_id)
- worker_thread = threading.Thread(
- name='run_worker_%s' % start_worker_request.worker_id,
- target=worker.run)
- worker_thread.daemon = True
- worker_thread.start()
+ if self._use_process:
+ command = ['python', '-c',
+ 'from apache_beam.runners.worker.sdk_worker '
+ 'import SdkHarness; '
+ 'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+ start_worker_request.control_endpoint.url,
+ self._worker_threads,
+ start_worker_request.worker_id)]
+ logging.warn("Starting worker with command %s" % (command))
+ worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+
+ # Register to kill the subprocess on exit.
+ atexit.register(worker_process.kill)
+ else:
+ worker = sdk_worker.SdkHarness(
+ start_worker_request.control_endpoint.url,
+ worker_count=self._worker_threads,
+ worker_id=start_worker_request.worker_id)
+ worker_thread = threading.Thread(
+ name='run_worker_%s' % start_worker_request.worker_id,
+ target=worker.run)
+ worker_thread.daemon = True
+ worker_thread.start()
+
return beam_fn_api_pb2.NotifyRunnerAvailableResponse()
except Exception as exn:
return beam_fn_api_pb2.NotifyRunnerAvailableResponse(