You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:31:07 UTC
[buildstream] 13/19: Move subprocess machinery into a method
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 8b32793fa8ce95273ee034829166e0697363d00e
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Jul 5 11:26:04 2019 +0100
Move subprocess machinery into a method
This get's round the pickling identity issues
---
src/buildstream/_stream.py | 128 ++++++++++++++++++++++++++++++---------------
1 file changed, 87 insertions(+), 41 deletions(-)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 0fe2234..e4f6bc0 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,7 +19,6 @@
# Jürg Billeter <ju...@codethink.co.uk>
# Tristan Maat <tr...@codethink.co.uk>
-import asyncio
import itertools
import functools
import multiprocessing as mp
@@ -48,38 +47,44 @@ from . import utils, _yaml, _site
from . import Scope, Consistency
-# A decorator which runs the decorated method to be run in a subprocess
-def subprocessed(func):
-
- @functools.wraps(func)
- def _subprocessed(self, *args, **kwargs):
- assert self
- print("Args: {}".format([*args]))
- print("Kwargs: {}".format(list(kwargs.items())))
- assert not self._subprocess
-
- # TODO use functools to pass arguments to func to make target for subprocess
-
- # Start subprocessed work
- mp_context = mp.get_context(method='spawn')
- process_name = "stream-{}".format(func.__name__)
- target = functools.partial(func, self, *args, **kwargs)
- print("launching subprocess:", process_name)
- self._subprocess = mp_context.Process(target=target, name=process_name)
- self._subprocess.start()
-
- # TODO connect signal handlers
-
- # Run event loop. This event loop should exit once the
- # subprocessed work has completed
- print("Starting loop...")
- while not self._subprocess.exitcode:
- self._loop()
- print("Stopping loop...")
-
- # Return result of subprocessed function
-
- return _subprocessed
+def _subprocessed(self, *args, **kwargs):
+ assert self
+ print("Args: {}".format([*args]))
+ print("Kwargs: {}".format(list(kwargs.items())))
+ assert not self._subprocess
+
+ global notification_count
+ notification_count = 0
+ # TODO use functools to pass arguments to func to make target for subprocess
+
+ # Start subprocessed work
+ mp_context = mp.get_context(method='spawn')
+ process_name = "stream-{}".format(func.__name__)
+ print("launchinglaunching subprocess:", process_name)
+ print(func.__module__, func.__name__)
+ import buildstream
+ try:
+ assert func is buildstream._stream.Stream.build or func is Stream.build
+ except AssertionError:
+ print(func, func.__qualname__, func.__name__, func.__module__, id(func))
+ self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+ self._subprocess.start()
+
+ # TODO connect signal handlers
+
+ self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+
+ print("Starting loop...")
+ while not self._subprocess.exitcode:
+ self._loop()
+ print("Stopping loop...")
+
+ try:
+ while True:
+ notification = self.notification_queue.get()
+ self._scheduler_notification_handler(notification)
+ except queue.Empty:
+ pass
# Stream()
@@ -141,6 +146,46 @@ class Stream():
def init(self):
self._artifacts = self._context.artifactcache
self._sourcecache = self._context.sourcecache
+ print(Stream.build, Stream.build.__qualname__, Stream.build.__name__, Stream.build.__module__, id(Stream.build))
+
+
+ def run_in_subprocess(self, func, *args, **kwargs):
+ print("Args: {}".format([*args]))
+ print("Kwargs: {}".format(list(kwargs.items())))
+ assert not self._subprocess
+
+ global notification_count
+ notification_count = 0
+ # TODO use functools to pass arguments to func to make target for subprocess
+
+ # Start subprocessed work
+ mp_context = mp.get_context(method='fork')
+ process_name = "stream-{}".format(func.__name__)
+ print("launchinglaunching subprocess:", process_name)
+ print(func.__module__, func.__name__)
+ import buildstream
+ try:
+ assert func is buildstream._stream.Stream.build or func is Stream.build
+ except AssertionError:
+ print(func, func.__qualname__, func.__name__, func.__module__, id(func))
+ self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+ self._subprocess.start()
+
+ # TODO connect signal handlers
+
+ self._subprocess = mp_context.Process(target=func, args=args, kwargs=kwargs, name=process_name)
+
+ print("Starting loop...")
+ while not self._subprocess.exitcode:
+ self._loop()
+ print("Stopping loop...")
+
+ try:
+ while True:
+ notification = self.notification_queue.get()
+ self._scheduler_notification_handler(notification)
+ except queue.Empty:
+ pass
# cleanup()
#
@@ -265,6 +310,8 @@ class Stream():
return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command,
usebuildtree=buildtree)
+ def build(self, *args, **kwargs):
+ self.run_in_subprocess(self._build, *args, **kwargs)
# build()
#
# Builds (assembles) elements in the pipeline.
@@ -282,14 +329,13 @@ class Stream():
# If `remote` specified as None, then regular configuration will be used
# to determine where to push artifacts to.
#
- @subprocessed
- def build(self, targets, *,
- track_targets=None,
- track_except=None,
- track_cross_junctions=False,
- ignore_junction_targets=False,
- build_all=False,
- remote=None):
+ def _build(self, targets, *,
+ track_targets=None,
+ track_except=None,
+ track_cross_junctions=False,
+ ignore_junction_targets=False,
+ build_all=False,
+ remote=None):
if build_all:
selection = PipelineSelection.ALL