You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2019/03/12 10:11:10 UTC

[beam] branch master updated: [BEAM-6754] Add option to use subprocesses instead of threads in loopback environment (#7984)

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a0312a  [BEAM-6754] Add option to use subprocesses instead of threads in loopback environment (#7984)
7a0312a is described below

commit 7a0312aeec9156175900a34827952f824e265839
Author: Ankur <an...@users.noreply.github.com>
AuthorDate: Tue Mar 12 03:11:02 2019 -0700

    [BEAM-6754] Add option to use subprocesses instead of threads in loopback environment (#7984)
---
 .../runners/portability/portable_runner.py         | 48 ++++++++++++++++------
 1 file changed, 35 insertions(+), 13 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py
index c6f5591..eec5579 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner.py
@@ -17,11 +17,13 @@
 
 from __future__ import absolute_import
 
+import atexit
 import functools
 import itertools
 import json
 import logging
 import os
+import subprocess
 import threading
 import time
 from concurrent import futures
@@ -168,9 +170,13 @@ class PortableRunner(runner.PipelineRunner):
     # This is needed as we start a worker server if one is requested
     # but none is provided.
     if portable_options.environment_type == 'LOOPBACK':
+      use_loopback_process_worker = options.view_as(
+          DebugOptions).lookup_experiment(
+              'use_loopback_process_worker', False)
       portable_options.environment_config, server = (
           BeamFnExternalWorkerPoolServicer.start(
-              sdk_worker_main._get_worker_count(options)))
+              sdk_worker_main._get_worker_count(options),
+              use_process=use_loopback_process_worker))
       globals()['x'] = server
       cleanup_callbacks = [functools.partial(server.stop, 1)]
     else:
@@ -439,29 +445,45 @@ class PipelineResult(runner.PipelineResult):
 class BeamFnExternalWorkerPoolServicer(
     beam_fn_api_pb2_grpc.BeamFnExternalWorkerPoolServicer):
 
-  def __init__(self, worker_threads):
+  def __init__(self, worker_threads, use_process=False):
     self._worker_threads = worker_threads
+    self._use_process = use_process
 
   @classmethod
-  def start(cls, worker_threads=1):
+  def start(cls, worker_threads=1, use_process=False):
     worker_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
     worker_address = 'localhost:%s' % worker_server.add_insecure_port('[::]:0')
     beam_fn_api_pb2_grpc.add_BeamFnExternalWorkerPoolServicer_to_server(
-        cls(worker_threads), worker_server)
+        cls(worker_threads, use_process=use_process), worker_server)
     worker_server.start()
     return worker_address, worker_server
 
   def NotifyRunnerAvailable(self, start_worker_request, context):
     try:
-      worker = sdk_worker.SdkHarness(
-          start_worker_request.control_endpoint.url,
-          worker_count=self._worker_threads,
-          worker_id=start_worker_request.worker_id)
-      worker_thread = threading.Thread(
-          name='run_worker_%s' % start_worker_request.worker_id,
-          target=worker.run)
-      worker_thread.daemon = True
-      worker_thread.start()
+      if self._use_process:
+        command = ['python', '-c',
+                   'from apache_beam.runners.worker.sdk_worker '
+                   'import SdkHarness; '
+                   'SdkHarness("%s",worker_count=%d,worker_id="%s").run()' % (
+                       start_worker_request.control_endpoint.url,
+                       self._worker_threads,
+                       start_worker_request.worker_id)]
+        logging.warn("Starting worker with command %s" % (command))
+        worker_process = subprocess.Popen(command, stdout=subprocess.PIPE)
+
+        # Register to kill the subprocess on exit.
+        atexit.register(worker_process.kill)
+      else:
+        worker = sdk_worker.SdkHarness(
+            start_worker_request.control_endpoint.url,
+            worker_count=self._worker_threads,
+            worker_id=start_worker_request.worker_id)
+        worker_thread = threading.Thread(
+            name='run_worker_%s' % start_worker_request.worker_id,
+            target=worker.run)
+        worker_thread.daemon = True
+        worker_thread.start()
+
       return beam_fn_api_pb2.NotifyRunnerAvailableResponse()
     except Exception as exn:
       return beam_fn_api_pb2.NotifyRunnerAvailableResponse(