You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:18:12 UTC

[buildstream] branch bschubert/no-multiprocessing-full created (now 5b948b8)

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at 5b948b8  WIP

This branch includes the following new commits:

     new 8d91b97  _signals.py: allow calling signal handler from non-main threads
     new 065e563  _fixtures.py: Only get the normal number of threads at the start of session
     new 1e6467a  scheduler.py: Use threads instead of processes for jobs
     new 349f97d  _messenger.py: Make `timed_suspendable` public and use it in job.py
     new c42507e  job.py: Remove 'SUBCLASS_CUSTOM_MESSAGE', it is never used
     new 25a9b2e  job.py: Remove the ability to send back child data
     new 8db1e97  _messenger.py: Add type annotations on the module to help refactors
     new 58986f9  _messenger.py: Make the messenger aware of jobs and stop having multiple
     new 8b72daf  job.py: Dont' pass the errors through the queue, we can set it directly
     new 552a40b  job.py: Completely remove the need for a queue between parent and child jobs
     new 5e118e2  WIP
     new d211abc  plugin.py: Add a helper to run blocking processes in subprocesses
     new 5b948b8  WIP

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 05/13: job.py: Remove 'SUBCLASS_CUSTOM_MESSAGE', it is never used

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit c42507e0eb9441e805e947096ac8f7f441fa12fc
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Mon Jul 6 18:52:21 2020 +0000

    job.py: Remove 'SUBCLASS_CUSTOM_MESSAGE', it is never used
---
 src/buildstream/_scheduler/jobs/job.py | 38 ----------------------------------
 1 file changed, 38 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 03a6b61..29f2e36 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -71,7 +71,6 @@ class _MessageType(FastEnum):
     ERROR = 2
     RESULT = 3
     CHILD_DATA = 4
-    SUBCLASS_CUSTOM_MESSAGE = 5
 
 
 # Job()
@@ -273,22 +272,6 @@ class Job:
     #                  Abstract Methods                   #
     #######################################################
 
-    # handle_message()
-    #
-    # Handle a custom message. This will be called in the main process in
-    # response to any messages sent to the main process using the
-    # Job.send_message() API from inside a Job.child_process() implementation.
-    #
-    # There is no need to implement this function if no custom messages are
-    # expected.
-    #
-    # Args:
-    #    message (any): A simple object (must be pickle-able, i.e. strings,
-    #                   lists, dicts, numbers, but not Element instances).
-    #
-    def handle_message(self, message):
-        raise ImplError("Job '{kind}' does not implement handle_message()".format(kind=type(self).__name__))
-
     # parent_complete()
     #
     # This will be executed in the main process after the job finishes, and is
@@ -423,8 +406,6 @@ class Job:
         elif envelope.message_type is _MessageType.CHILD_DATA:
             # If we retry a job, we assign a new value to this
             self.child_data = envelope.message
-        elif envelope.message_type is _MessageType.SUBCLASS_CUSTOM_MESSAGE:
-            self.handle_message(envelope.message)
         else:
             assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
 
@@ -536,25 +517,6 @@ class ChildJob:
             Message(message_type, message, element_name=element_name, element_key=element_key, **kwargs)
         )
 
-    # send_message()
-    #
-    # Send data in a message to the parent Job, running in the main process.
-    #
-    # This allows for custom inter-process communication between subclasses of
-    # Job and ChildJob.
-    #
-    # These messages will be processed by the Job.handle_message()
-    # implementation, which may be overridden to support one or more custom
-    # 'message_type's.
-    #
-    # Args:
-    #    message_data (any): A simple object (must be pickle-able, i.e.
-    #                        strings, lists, dicts, numbers, but not Element
-    #                        instances). This is sent to the parent Job.
-    #
-    def send_message(self, message_data):
-        self._send_message(_MessageType.SUBCLASS_CUSTOM_MESSAGE, message_data)
-
     #######################################################
     #                  Abstract Methods                   #
     #######################################################


[buildstream] 13/13: WIP

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 5b948b8d96028c7850ec2bdaa19aff3dde09e790
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Fri Jul 10 07:55:03 2020 +0000

    WIP
---
 src/buildstream/plugin.py | 79 ++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 72 insertions(+), 7 deletions(-)

diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 0ed6d7d..467c955 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -112,13 +112,14 @@ Class Reference
 import itertools
 import multiprocessing
 import os
+import signal
 import subprocess
 import sys
 from contextlib import contextmanager
 from typing import Callable, Generator, Optional, Tuple, TypeVar, TYPE_CHECKING
 from weakref import WeakValueDictionary
 
-from . import utils
+from . import utils, _signals
 from ._exceptions import PluginError, ImplError
 from ._message import Message, MessageType
 from .node import MappingNode, ProvenanceInformation
@@ -137,6 +138,18 @@ T2 = TypeVar("T2")
 
 
 def _background_job_wrapper(queue: multiprocessing.Queue, target: Callable[[T1], T2], args: T1) -> None:
+    # This avoids some SIGTSTP signals from grandchildren
+    # getting propagated up to the master process
+    os.setsid()
+
+    # First set back to the default signal handlers for the signals
+    # we handle, and then clear their blocked state.
+    #
+    signal_list = [signal.SIGTSTP, signal.SIGTERM]
+    for sig in signal_list:
+        signal.signal(sig, signal.SIG_DFL)
+    signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
+
     queue.put(target(*args))
 
 
@@ -520,15 +533,67 @@ class Plugin:
         ):
             queue = self.__multiprocessing_context.Queue()
 
-            proc = self.__multiprocessing_context.Process(target=_background_job_wrapper, args=(queue, target, args))
-            proc.start()
-
-            result = queue.get()
-            proc.join()
+            process = None
+
+            from .utils import _kill_process_tree
+            import psutil
+
+            # Handle termination, suspend and resume
+            def kill_proc():
+                if not process:
+                    return
+
+                proc = psutil.Process(process.pid)
+
+                # Some callers know that their subprocess can be
+                # gracefully terminated, make an attempt first
+                proc.terminate()
+
+                try:
+                    proc.wait(5)
+                except psutil.TimeoutExpired:
+                    # Did not terminate within the timeout: murder
+                    _kill_process_tree(proc.pid)
+
+                else:
+                    # FIXME: This is a brutal but reliable approach
+                    #
+                    # Other variations I've tried which try SIGTERM first
+                    # and then wait for child processes to exit gracefully
+                    # have not reliably cleaned up process trees and have
+                    # left orphaned git or ssh processes alive.
+                    #
+                    # This cleans up the subprocesses reliably but may
+                    # cause side effects such as possibly leaving stale
+                    # locks behind. Hopefully this should not be an issue
+                    # as long as any child processes only interact with
+                    # the temp directories which we control and cleanup
+                    # ourselves.
+                    #
+                    _kill_process_tree(proc.pid)
+
+            def suspend_proc():
+                if process:
+                    group_id = os.getpgid(process.pid)
+                    os.killpg(group_id, signal.SIGSTOP)
+
+            def resume_proc():
+                if process:
+                    group_id = os.getpgid(process.pid)
+                    os.killpg(group_id, signal.SIGCONT)
+
+            with _signals.suspendable(suspend_proc, resume_proc), _signals.terminator(kill_proc):
+
+                process = self.__multiprocessing_context.Process(target=_background_job_wrapper, args=(queue, target, args))
+
+                with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
+                    process.start()
+
+                result = queue.get()
+                process.join()
 
             return result
 
-
     def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int:
         """A wrapper for subprocess.call()
 


[buildstream] 06/13: job.py: Remove the ability to send back child data

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 25a9b2e132fffa2f9c34ad0051204470cc23ce08
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Tue Jul 7 17:58:50 2020 +0100

    job.py: Remove the ability to send back child data
    
    We only use this for workspace information that we can now get directly
    since we run in the same process
---
 src/buildstream/_scheduler/jobs/elementjob.py |  9 ---------
 src/buildstream/_scheduler/jobs/job.py        | 22 ----------------------
 src/buildstream/_scheduler/queues/queue.py    | 10 +++++-----
 3 files changed, 5 insertions(+), 36 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index 6831295..c72be40 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -91,12 +91,3 @@ class ChildElementJob(ChildJob):
 
         # Run the action
         return self._action_cb(self._element)
-
-    def child_process_data(self):
-        data = {}
-
-        workspace = self._element._get_workspace()
-        if workspace is not None:
-            data["workspace"] = workspace.to_dict()
-
-        return data
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 29f2e36..f331d3f 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -70,7 +70,6 @@ class _MessageType(FastEnum):
     LOG_MESSAGE = 1
     ERROR = 2
     RESULT = 3
-    CHILD_DATA = 4
 
 
 # Job()
@@ -116,7 +115,6 @@ class Job:
         #
         self.name = None  # The name of the job, set by the job's subclass
         self.action_name = action_name  # The action name for the Queue
-        self.child_data = None  # Data to be sent to the main process
 
         #
         # Private members
@@ -403,9 +401,6 @@ class Job:
         elif envelope.message_type is _MessageType.RESULT:
             assert self._result is None
             self._result = envelope.message
-        elif envelope.message_type is _MessageType.CHILD_DATA:
-            # If we retry a job, we assign a new value to this
-            self.child_data = envelope.message
         else:
             assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
 
@@ -535,20 +530,6 @@ class ChildJob:
     def child_process(self):
         raise ImplError("ChildJob '{kind}' does not implement child_process()".format(kind=type(self).__name__))
 
-    # child_process_data()
-    #
-    # Abstract method to retrieve additional data that should be
-    # returned to the parent process. Note that the job result is
-    # retrieved independently.
-    #
-    # Values can later be retrieved in Job.child_data.
-    #
-    # Returns:
-    #    (dict) A dict containing values to be reported to the main process
-    #
-    def child_process_data(self):
-        return {}
-
     # child_action()
     #
     # Perform the action in the child process, this calls the action_cb.
@@ -596,8 +577,6 @@ class ChildJob:
                         MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox
                     )
 
-                self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
-
                 # Report the exception to the parent (for internal testing purposes)
                 self._child_send_error(e)
 
@@ -620,7 +599,6 @@ class ChildJob:
 
             else:
                 # No exception occurred in the action
-                self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
                 self._child_send_result(result)
 
                 elapsed = datetime.datetime.now() - timeinfo.start_time
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 9e444b3..96a7016 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -268,16 +268,16 @@ class Queue:
     #    job (Job): The job which completed
     #
     def _update_workspaces(self, element, job):
-        workspace_dict = None
-        if job.child_data:
-            workspace_dict = job.child_data.get("workspace", None)
+        # FIXME: This should be done only for build jobs and not by proding
+        #        the element, but as an explicit return value from the job
+        workspace = element._get_workspace()
 
         # Handle any workspace modifications now
         #
-        if workspace_dict:
+        if workspace:
             context = element._get_context()
             workspaces = context.get_workspaces()
-            if workspaces.update_workspace(element._get_full_name(), workspace_dict):
+            if workspaces.update_workspace(element._get_full_name(), workspace.to_dict()):
                 try:
                     workspaces.save_config()
                 except BstError as e:


[buildstream] 08/13: _messenger.py: Make the messenger aware of jobs and stop having multiple

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 58986f982d34177f753feaf9501038e43200bd73
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Wed Jul 8 19:17:05 2020 +0000

    _messenger.py: Make the messenger aware of jobs and stop having multiple
---
 src/buildstream/_frontend/app.py       |   8 +-
 src/buildstream/_messenger.py          | 245 +++++++++++++++++++--------------
 src/buildstream/_scheduler/jobs/job.py |  65 ++-------
 tests/testutils/context.py             |   2 +-
 4 files changed, 154 insertions(+), 166 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 5d49e96..88c11c1 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -34,7 +34,7 @@ from .._context import Context
 from .._project import Project
 from .._exceptions import BstError, StreamError, LoadError, AppError
 from ..exceptions import LoadErrorReason
-from .._message import Message, MessageType, unconditional_messages
+from .._message import Message, MessageType
 from .._stream import Stream
 from ..types import _SchedulerErrorAction
 from .. import node
@@ -791,7 +791,7 @@ class App:
     #
     # Handle messages from the pipeline
     #
-    def _message_handler(self, message, is_silenced):
+    def _message_handler(self, message):
 
         # Drop status messages from the UI if not verbose, we'll still see
         # info messages and status messages will still go to the log files.
@@ -802,10 +802,6 @@ class App:
         if message.message_type in [MessageType.FAIL, MessageType.BUG] and message.element_name is not None:
             self._fail_messages[message.element_name] = message
 
-        # Send to frontend if appropriate
-        if is_silenced and (message.message_type not in unconditional_messages):
-            return
-
         # Format the message & cache it
         text = self.logger.render(message)
         self._message_text += text
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index eb3bd51..3220cb1 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -21,11 +21,11 @@ import os
 import datetime
 import threading
 from contextlib import contextmanager
-from typing import Callable, Generator, Optional
+from typing import Callable, Generator, Optional, TextIO
 
 from . import _signals
 from ._exceptions import BstError
-from ._message import Message, MessageType
+from ._message import Message, MessageType, unconditional_messages
 from ._state import State, _Task
 
 
@@ -48,9 +48,86 @@ class _TimeData:
         self.start_time = start_time
 
 
-class MessageHandlerCallback:
-    def __call__(self, message: Message, is_silenced: bool) -> None:
-        pass
+class _JobRecorder:
+    def __init__(self, action_name: str, element_key: str, log_filename: str) -> None:
+        self.action_name = action_name
+        self.element_key = element_key
+        self.log_filename = log_filename
+
+        self.log_handle: Optional[TextIO] = None
+        self.silence_scope_depth = 0
+
+    @contextmanager
+    def enable_recording(self) -> Generator["_JobRecorder", None, None]:
+        # Ensure the directory exists first
+        directory = os.path.dirname(self.log_filename)
+        os.makedirs(directory, exist_ok=True)
+
+        with open(self.log_filename, "a") as logfile:
+
+            # Write one last line to the log and flush it to disk
+            def flush_log():
+
+                # If the process currently had something happening in the I/O stack
+                # then trying to reenter the I/O stack will fire a runtime error.
+                #
+                # So just try to flush as well as we can at SIGTERM time
+                try:
+                    logfile.write("\n\nForcefully terminated\n")
+                    logfile.flush()
+                except RuntimeError:
+                    os.fsync(logfile.fileno())
+
+            self.log_handle = logfile
+
+            with _signals.terminator(flush_log):
+                yield self
+
+    # record_message()
+    #
+    # Records the message if recording is enabled
+    #
+    # Args:
+    #    message (Message): The message to record
+    #
+    def record_message(self, message: Message) -> None:
+        INDENT = "    "
+        EMPTYTIME = "--:--:--"
+        template = "[{timecode: <8}] {type: <7}"
+
+        # If this message is associated with an element or source plugin, print the
+        # full element name of the instance.
+        element_name = ""
+        if message.element_name:
+            template += " {element_name}"
+            element_name = message.element_name
+
+        template += ": {message}"
+
+        detail = ""
+        if message.detail is not None:
+            template += "\n\n{detail}"
+            detail = message.detail.rstrip("\n")
+            detail = INDENT + INDENT.join(detail.splitlines(True))
+
+        timecode = EMPTYTIME
+        if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+            hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2)
+            minutes, seconds = divmod(remainder, 60)
+            timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+        text = template.format(
+            timecode=timecode,
+            element_name=element_name,
+            type=message.message_type.upper(),
+            message=message.message,
+            detail=detail,
+        )
+
+        # Write to the open log file
+        assert self.log_handle is not None
+        self.log_handle.write("{}\n".format(text))
+        self.log_handle.flush()
 
 
 class Messenger:
@@ -60,11 +137,11 @@ class Messenger:
         self._active_simple_tasks: int = 0
         self._render_status_cb: Optional[Callable[[], None]] = None
 
+        self._message_handler: Optional[Callable[[Message], None]] = None
+        self._global_silence_scope_depth = 0
+
         self._locals = threading.local()
-        self._locals.message_handler = None
-        self._locals.log_handle = None
-        self._locals.log_filename = None
-        self._locals.silence_scope_depth = 0
+        self._locals.job = None
 
     # set_message_handler()
     #
@@ -74,8 +151,8 @@ class Messenger:
     # Args:
     #   handler: The handler to call on message
     #
-    def set_message_handler(self, handler: MessageHandlerCallback) -> None:
-        self._locals.message_handler = handler
+    def set_message_handler(self, handler: Callable[[Message], None]) -> None:
+        self._message_handler = handler
 
     # set_state()
     #
@@ -102,7 +179,9 @@ class Messenger:
     # Returns: Whether messages are currently being silenced
     #
     def _silent_messages(self) -> bool:
-        return self._locals.silence_scope_depth > 0
+        if self._locals.job is not None:
+            return self._locals.job.silence_scope_depth > 0
+        return self._global_silence_scope_depth > 0
 
     # message():
     #
@@ -113,15 +192,30 @@ class Messenger:
     #    message: A Message object
     #
     def message(self, message: Message) -> None:
-        # If we are recording messages, dump a copy into the open log file.
-        self._record_message(message)
+        job = self._locals.job
+
+        if job is not None:
+            message.action_name = job.action_name
+            message.logfile = job.log_filename
 
-        # Send it off to the log handler (can be the frontend,
-        # or it can be the child task which will propagate
-        # to the frontend)
-        assert self._locals.message_handler
+            # If no key has been set at this point, and the element job has
+            # a related key, set it.
+            if message.element_key is None:
+                message.element_key = job.element_key
 
-        self._locals.message_handler(message, is_silenced=self._silent_messages())
+            # Job always record messages
+            self._locals.job.record_message(message)
+
+            # Don't log LOG messages from jobs
+            if message.message_type == MessageType.LOG:
+                return
+
+        # Don't forward if it is currently silent
+        if self._silent_messages() and (message.message_type not in unconditional_messages):
+            return
+
+        assert self._message_handler is not None
+        self._message_handler(message)
 
     # silence()
     #
@@ -140,12 +234,22 @@ class Messenger:
             yield
             return
 
-        self._locals.silence_scope_depth += 1
+        in_job = self._locals.job is not None
+
+        if in_job:
+            self._locals.job.silence_scope_depth += 1
+        else:
+            self._global_silence_scope_depth += 1
+
         try:
             yield
         finally:
-            assert self._locals.silence_scope_depth > 0
-            self._locals.silence_scope_depth -= 1
+            if in_job:
+                assert self._locals.job.silence_scope_depth > 0
+                self._locals.job.silence_scope_depth -= 1
+            else:
+                assert self._global_silence_scope_depth > 0
+                self._global_silence_scope_depth -= 1
 
     # timed_activity()
     #
@@ -254,7 +358,7 @@ class Messenger:
             )
             self.message(message)
 
-    # recorded_messages()
+    # record_job()
     #
     # Records all messages in a log file while the context manager
     # is active.
@@ -274,42 +378,20 @@ class Messenger:
     # Yields: The fully qualified log filename
     #
     @contextmanager
-    def recorded_messages(self, filename: str, logdir: str) -> Generator[str, None, None]:
+    def record_job(
+        self, action_name: str, element_key: str, filename: str, logdir: str
+    ) -> Generator[_JobRecorder, None, None]:
         # We dont allow recursing in this context manager, and
         # we also do not allow it in the main process.
-        assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None
-        assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None
+        assert not hasattr(self._locals, "job") or self._locals.job is None
 
-        # Create the fully qualified logfile in the log directory,
-        # appending the pid and .log extension at the end.
-        self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
-        self._locals.silence_scope_depth = 0
+        log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+        self._locals.job = _JobRecorder(action_name, element_key, log_filename)
 
-        # Ensure the directory exists first
-        directory = os.path.dirname(self._locals.log_filename)
-        os.makedirs(directory, exist_ok=True)
-
-        with open(self._locals.log_filename, "a") as logfile:
+        with self._locals.job.enable_recording() as job:
+            yield job
 
-            # Write one last line to the log and flush it to disk
-            def flush_log():
-
-                # If the process currently had something happening in the I/O stack
-                # then trying to reenter the I/O stack will fire a runtime error.
-                #
-                # So just try to flush as well as we can at SIGTERM time
-                try:
-                    logfile.write("\n\nForcefully terminated\n")
-                    logfile.flush()
-                except RuntimeError:
-                    os.fsync(logfile.fileno())
-
-            self._locals.log_handle = logfile
-            with _signals.terminator(flush_log):
-                yield self._locals.log_filename
-
-            self._locals.log_handle = None
-            self._locals.log_filename = None
+        self._locals.job = None
 
     # get_log_handle()
     #
@@ -320,7 +402,9 @@ class Messenger:
     # Returns: The active logging file handle, or None
     #
     def get_log_handle(self) -> Optional[str]:
-        return self._locals.log_handle
+        if self._locals.job is not None:
+            return self._locals.job.log_handle
+        return None
 
     # get_log_filename()
     #
@@ -331,7 +415,7 @@ class Messenger:
     # Returns: The active logging filename, or None
     #
     def get_log_filename(self) -> str:
-        return self._locals.log_filename
+        return self._locals.job.log_filename
 
     # timed_suspendable()
     #
@@ -361,55 +445,6 @@ class Messenger:
         with _signals.suspendable(stop_time, resume_time):
             yield timedata
 
-    # _record_message()
-    #
-    # Records the message if recording is enabled
-    #
-    # Args:
-    #    message (Message): The message to record
-    #
-    def _record_message(self, message: Message) -> None:
-
-        if self._locals.log_handle is None:
-            return
-
-        INDENT = "    "
-        EMPTYTIME = "--:--:--"
-        template = "[{timecode: <8}] {type: <7}"
-
-        # If this message is associated with an element or source plugin, print the
-        # full element name of the instance.
-        element_name = ""
-        if message.element_name:
-            template += " {element_name}"
-            element_name = message.element_name
-
-        template += ": {message}"
-
-        detail = ""
-        if message.detail is not None:
-            template += "\n\n{detail}"
-            detail = message.detail.rstrip("\n")
-            detail = INDENT + INDENT.join(detail.splitlines(True))
-
-        timecode = EMPTYTIME
-        if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
-            hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2)
-            minutes, seconds = divmod(remainder, 60)
-            timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
-        text = template.format(
-            timecode=timecode,
-            element_name=element_name,
-            type=message.message_type.upper(),
-            message=message.message,
-            detail=detail,
-        )
-
-        # Write to the open log file
-        self._locals.log_handle.write("{}\n".format(text))
-        self._locals.log_handle.flush()
-
     # _render_status()
     #
     # Calls the render status callback set in the messenger, but only if a
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index f331d3f..30308a9 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -28,7 +28,7 @@ import traceback
 
 # BuildStream toplevel imports
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import Message, MessageType
 from ...types import FastEnum
 
 
@@ -67,7 +67,6 @@ class _Envelope:
 
 
 class _MessageType(FastEnum):
-    LOG_MESSAGE = 1
     ERROR = 2
     RESULT = 3
 
@@ -389,11 +388,7 @@ class Job:
         if not self._listening:
             return
 
-        if envelope.message_type is _MessageType.LOG_MESSAGE:
-            # Propagate received messages from children
-            # back through the context.
-            self._messenger.message(envelope.message)
-        elif envelope.message_type is _MessageType.ERROR:
+        if envelope.message_type is _MessageType.ERROR:
             # For regression tests only, save the last error domain / reason
             # reported from a child task in the main process, this global state
             # is currently managed in _exceptions.py
@@ -543,21 +538,20 @@ class ChildJob:
         # Set the global message handler in this child
         # process to forward messages to the parent process
         self._pipe_w = pipe_w
-        self._messenger.set_message_handler(self._child_message_handler)
 
         # Time, log and and run the action function
         #
-        with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages(
-            self._logfile, self._logdir
-        ) as filename:
-            self.message(MessageType.START, self.action_name, logfile=filename)
+        with self._messenger.timed_suspendable() as timeinfo, self._messenger.record_job(
+            self.action_name, self._message_element_key, self._logfile, self._logdir
+        ):
+            self.message(MessageType.START, self.action_name)
 
             try:
                 # Try the task action
                 result = self.child_process()  # pylint: disable=assignment-from-no-return
             except SkipJob as e:
                 elapsed = datetime.datetime.now() - timeinfo.start_time
-                self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
+                self.message(MessageType.SKIPPED, str(e), elapsed=elapsed)
 
                 # Alert parent of skip by return code
                 return _ReturnCode.SKIPPED
@@ -567,15 +561,10 @@ class ChildJob:
 
                 if retry_flag and (self._tries <= self._max_retries):
                     self.message(
-                        MessageType.FAIL,
-                        "Try #{} failed, retrying".format(self._tries),
-                        elapsed=elapsed,
-                        logfile=filename,
+                        MessageType.FAIL, "Try #{} failed, retrying".format(self._tries), elapsed=elapsed,
                     )
                 else:
-                    self.message(
-                        MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox
-                    )
+                    self.message(MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, sandbox=e.sandbox)
 
                 # Report the exception to the parent (for internal testing purposes)
                 self._child_send_error(e)
@@ -593,7 +582,7 @@ class ChildJob:
                 elapsed = datetime.datetime.now() - timeinfo.start_time
                 detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
 
-                self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
+                self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail)
                 # Unhandled exceptions should permenantly fail
                 return _ReturnCode.PERM_FAIL
 
@@ -602,7 +591,7 @@ class ChildJob:
                 self._child_send_result(result)
 
                 elapsed = datetime.datetime.now() - timeinfo.start_time
-                self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
+                self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed)
 
                 # Shutdown needs to stay outside of the above context manager,
                 # make sure we dont try to handle SIGTERM while the process
@@ -660,35 +649,3 @@ class ChildJob:
     def _child_send_result(self, result):
         if result is not None:
             self._send_message(_MessageType.RESULT, result)
-
-    # _child_message_handler()
-    #
-    # A Context delegate for handling messages, this replaces the
-    # frontend's main message handler in the context of a child task
-    # and performs local logging to the local log file before sending
-    # the message back to the parent process for further propagation.
-    # The related element display key is added to the message for
-    # widget rendering if not already set for an element childjob.
-    #
-    # Args:
-    #    message     (Message): The message to log
-    #    is_silenced (bool)   : Whether messages are silenced
-    #
-    def _child_message_handler(self, message, is_silenced):
-
-        message.action_name = self.action_name
-
-        # If no key has been set at this point, and the element job has
-        # a related key, set it. This is needed for messages going
-        # straight to the message handler from the child process.
-        if message.element_key is None and self._message_element_key:
-            message.element_key = self._message_element_key
-
-        # Send to frontend if appropriate
-        if is_silenced and (message.message_type not in unconditional_messages):
-            return
-
-        if message.message_type == MessageType.LOG:
-            return
-
-        self._send_message(_MessageType.LOG_MESSAGE, message)
diff --git a/tests/testutils/context.py b/tests/testutils/context.py
index 821adef..ab14c1b 100644
--- a/tests/testutils/context.py
+++ b/tests/testutils/context.py
@@ -23,7 +23,7 @@ from buildstream._context import Context
 
 
 # Handle messages from the pipeline
-def _dummy_message_handler(message, is_silenced):
+def _dummy_message_handler(message):
     pass
 
 


[buildstream] 03/13: scheduler.py: Use threads instead of processes for jobs

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 1e6467ad04a1f1627bb1e5336acce1ebc7c2689c
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Fri Jul 3 12:57:06 2020 +0000

    scheduler.py: Use threads instead of processes for jobs
    
    This changes how the scheduler works and adapts all the code that needs
    adapting in order to be able to run in threads instead of in
    subprocesses, which helps with Windows support, and will allow some
    simplifications in the main pipeline.
---
 src/buildstream/_cas/cascache.py               |  67 ++++-------
 src/buildstream/_cas/casdprocessmanager.py     |  79 +++++++------
 src/buildstream/_context.py                    |  14 ---
 src/buildstream/_messenger.py                  |  58 +++++-----
 src/buildstream/_remote.py                     |  56 ++++-----
 src/buildstream/_scheduler/_multiprocessing.py |  79 -------------
 src/buildstream/_scheduler/jobs/job.py         | 151 ++++++-------------------
 src/buildstream/_scheduler/scheduler.py        |  69 +++++------
 src/buildstream/downloadablefilesource.py      |   5 +
 src/buildstream/source.py                      |   8 --
 src/buildstream/testing/_fixtures.py           |   1 +
 tests/internals/cascache.py                    |   9 ++
 12 files changed, 211 insertions(+), 385 deletions(-)

diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py
index 7936121..b4e2063 100644
--- a/src/buildstream/_cas/cascache.py
+++ b/src/buildstream/_cas/cascache.py
@@ -22,11 +22,9 @@ import itertools
 import os
 import stat
 import contextlib
-import ctypes
-import multiprocessing
-import signal
 import time
 from typing import Optional, List
+import threading
 
 import grpc
 
@@ -34,7 +32,7 @@ from .._protos.google.rpc import code_pb2
 from .._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 from .._protos.build.buildgrid import local_cas_pb2
 
-from .. import _signals, utils
+from .. import utils
 from ..types import FastEnum, SourceRef
 from .._exceptions import CASCacheError
 
@@ -93,6 +91,7 @@ class CASCache:
 
             self._casd_channel = self._casd_process_manager.create_channel()
             self._cache_usage_monitor = _CASCacheUsageMonitor(self._casd_channel)
+            self._cache_usage_monitor.start()
 
     # get_cas():
     #
@@ -132,7 +131,8 @@ class CASCache:
     #
     def release_resources(self, messenger=None):
         if self._cache_usage_monitor:
-            self._cache_usage_monitor.release_resources()
+            self._cache_usage_monitor.stop()
+            self._cache_usage_monitor.join()
 
         if self._casd_process_manager:
             self.close_grpc_channels()
@@ -731,65 +731,42 @@ class _CASCacheUsage:
 # This manages the subprocess that tracks cache usage information via
 # buildbox-casd.
 #
-class _CASCacheUsageMonitor:
+class _CASCacheUsageMonitor(threading.Thread):
     def __init__(self, connection):
+        super().__init__()
         self._connection = connection
-
-        # Shared memory (64-bit signed integer) for current disk usage and quota
-        self._disk_usage = multiprocessing.Value(ctypes.c_longlong, -1)
-        self._disk_quota = multiprocessing.Value(ctypes.c_longlong, -1)
-
-        # multiprocessing.Process will fork without exec on Unix.
-        # This can't be allowed with background threads or open gRPC channels.
-        assert utils._is_single_threaded() and connection.is_closed()
-
-        # Block SIGINT, we don't want to kill the process when we interrupt the frontend
-        # and this process if very lightweight.
-        with _signals.blocked([signal.SIGINT], ignore=False):
-            self._subprocess = multiprocessing.Process(target=self._subprocess_run)
-            self._subprocess.start()
+        self._disk_usage = None
+        self._disk_quota = None
+        self._should_stop = False
 
     def get_cache_usage(self):
-        disk_usage = self._disk_usage.value
-        disk_quota = self._disk_quota.value
-
-        if disk_usage < 0:
-            # Disk usage still unknown
-            disk_usage = None
-
-        if disk_quota <= 0:
-            # No disk quota
-            disk_quota = None
-
-        return _CASCacheUsage(disk_usage, disk_quota)
-
-    def release_resources(self):
-        # Simply terminate the subprocess, no cleanup required in the subprocess
-        self._subprocess.terminate()
+        # FIXME: remove this abstraction
+        return _CASCacheUsage(self._disk_usage, self._disk_quota)
 
-    def _subprocess_run(self):
-        # Reset SIGTERM in subprocess to default as no cleanup is necessary
-        signal.signal(signal.SIGTERM, signal.SIG_DFL)
+    def stop(self):
+        self._should_stop = True
 
-        disk_usage = self._disk_usage
-        disk_quota = self._disk_quota
+    def run(self):
         local_cas = self._connection.get_local_cas()
 
-        while True:
+        while not self._should_stop:
             try:
                 # Ask buildbox-casd for current value
                 request = local_cas_pb2.GetLocalDiskUsageRequest()
                 response = local_cas.GetLocalDiskUsage(request)
 
                 # Update values in shared memory
-                disk_usage.value = response.size_bytes
-                disk_quota.value = response.quota_bytes
+                self._disk_usage = response.size_bytes
+                self._disk_quota = response.quota_bytes
             except grpc.RpcError:
                 # Terminate loop when buildbox-casd becomes unavailable
                 break
 
             # Sleep until next refresh
-            time.sleep(_CACHE_USAGE_REFRESH)
+            for _ in range(_CACHE_USAGE_REFRESH * 10):
+                if self._should_stop:
+                    break
+                time.sleep(0.1)
 
 
 def _grouper(iterable, n):
diff --git a/src/buildstream/_cas/casdprocessmanager.py b/src/buildstream/_cas/casdprocessmanager.py
index 32e4cce..519ee19 100644
--- a/src/buildstream/_cas/casdprocessmanager.py
+++ b/src/buildstream/_cas/casdprocessmanager.py
@@ -17,6 +17,7 @@
 #
 
 import contextlib
+import threading
 import os
 import random
 import shutil
@@ -240,35 +241,44 @@ class CASDChannel:
         self._asset_fetch = None
         self._asset_push = None
         self._casd_pid = casd_pid
+        self._shutdown_requested = False
 
-    def _establish_connection(self):
-        assert self._casd_channel is None
+        self.lock = threading.Lock()
 
-        while not os.path.exists(self._socket_path):
-            # casd is not ready yet, try again after a 10ms delay,
-            # but don't wait for more than specified timeout period
-            if time.time() > self._start_time + _CASD_TIMEOUT:
-                raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
+    def _establish_connection(self):
+        with self.lock:
+            if self._casd_channel is not None:
+                return
 
-            # check that process is still alive
-            try:
-                proc = psutil.Process(self._casd_pid)
-                if proc.status() == psutil.STATUS_ZOMBIE:
-                    proc.wait()
+            while not os.path.exists(self._socket_path):
+                # casd is not ready yet, try again after a 10ms delay,
+                # but don't wait for more than specified timeout period
+                if time.time() > self._start_time + _CASD_TIMEOUT:
+                    raise CASCacheError("Timed out waiting for buildbox-casd to become ready")
 
-                if not proc.is_running():
+                # check that process is still alive
+                try:
+                    proc = psutil.Process(self._casd_pid)
+                    if proc.status() == psutil.STATUS_ZOMBIE:
+                        proc.wait()
+
+                    if not proc.is_running():
+                        if self._shutdown_requested:
+                            return
+                        raise CASCacheError("buildbox-casd process died before connection could be established")
+                except psutil.NoSuchProcess:
+                    if self._shutdown_requested:
+                        return
                     raise CASCacheError("buildbox-casd process died before connection could be established")
-            except psutil.NoSuchProcess:
-                raise CASCacheError("buildbox-casd process died before connection could be established")
 
-            time.sleep(0.01)
+                time.sleep(0.01)
 
-        self._casd_channel = grpc.insecure_channel(self._connection_string)
-        self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel)
-        self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
-        self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
-        self._asset_fetch = remote_asset_pb2_grpc.FetchStub(self._casd_channel)
-        self._asset_push = remote_asset_pb2_grpc.PushStub(self._casd_channel)
+            self._casd_channel = grpc.insecure_channel(self._connection_string)
+            self._bytestream = bytestream_pb2_grpc.ByteStreamStub(self._casd_channel)
+            self._casd_cas = remote_execution_pb2_grpc.ContentAddressableStorageStub(self._casd_channel)
+            self._local_cas = local_cas_pb2_grpc.LocalContentAddressableStorageStub(self._casd_channel)
+            self._asset_fetch = remote_asset_pb2_grpc.FetchStub(self._casd_channel)
+            self._asset_push = remote_asset_pb2_grpc.PushStub(self._casd_channel)
 
     # get_cas():
     #
@@ -284,12 +294,12 @@ class CASDChannel:
     # Return LocalCAS stub for buildbox-casd channel.
     #
     def get_local_cas(self):
-        if self._casd_channel is None:
+        if self._local_cas is None:
             self._establish_connection()
         return self._local_cas
 
     def get_bytestream(self):
-        if self._casd_channel is None:
+        if self._bytestream is None:
             self._establish_connection()
         return self._bytestream
 
@@ -323,12 +333,15 @@ class CASDChannel:
     # Close the casd channel.
     #
     def close(self):
-        if self.is_closed():
-            return
-        self._asset_push = None
-        self._asset_fetch = None
-        self._local_cas = None
-        self._casd_cas = None
-        self._bytestream = None
-        self._casd_channel.close()
-        self._casd_channel = None
+        with self.lock:
+            self._shutdown_requested = True
+
+            if self.is_closed():
+                return
+            self._asset_push = None
+            self._asset_fetch = None
+            self._local_cas = None
+            self._casd_cas = None
+            self._bytestream = None
+            self._casd_channel.close()
+            self._casd_channel = None
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 8b55915..7a258f5 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -552,17 +552,3 @@ class Context:
                 log_directory=self.logdir,
             )
         return self._cascache
-
-    # prepare_fork():
-    #
-    # Prepare this process for fork without exec. This is a safeguard against
-    # fork issues with multiple threads and gRPC connections.
-    #
-    def prepare_fork(self):
-        # gRPC channels must be closed before fork.
-        for cache in [self._cascache, self._artifactcache, self._sourcecache]:
-            if cache:
-                cache.close_grpc_channels()
-
-        # Do not allow fork if there are background threads.
-        return utils._is_single_threaded()
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 3a32a24..222b05d 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -19,10 +19,10 @@
 
 import os
 import datetime
+import threading
 from contextlib import contextmanager
 
 from . import _signals
-from . import utils
 from ._exceptions import BstError
 from ._message import Message, MessageType
 
@@ -48,15 +48,17 @@ class _TimeData:
 
 class Messenger:
     def __init__(self):
-        self._message_handler = None
-        self._silence_scope_depth = 0
-        self._log_handle = None
-        self._log_filename = None
         self._state = None
         self._next_render = None  # A Time object
         self._active_simple_tasks = 0
         self._render_status_cb = None
 
+        self._locals = threading.local()
+        self._locals.message_handler = None
+        self._locals.log_handle = None
+        self._locals.log_filename = None
+        self._locals.silence_scope_depth = 0
+
     # set_message_handler()
     #
     # Sets the handler for any status messages propagated through
@@ -70,7 +72,7 @@ class Messenger:
     #   ) -> None
     #
     def set_message_handler(self, handler):
-        self._message_handler = handler
+        self._locals.message_handler = handler
 
     # set_state()
     #
@@ -101,7 +103,7 @@ class Messenger:
     #    (bool): Whether messages are currently being silenced
     #
     def _silent_messages(self):
-        return self._silence_scope_depth > 0
+        return self._locals.silence_scope_depth > 0
 
     # message():
     #
@@ -112,16 +114,15 @@ class Messenger:
     #    message: A Message object
     #
     def message(self, message):
-
         # If we are recording messages, dump a copy into the open log file.
         self._record_message(message)
 
         # Send it off to the log handler (can be the frontend,
         # or it can be the child task which will propagate
         # to the frontend)
-        assert self._message_handler
+        assert self._locals.message_handler
 
-        self._message_handler(message, is_silenced=self._silent_messages())
+        self._locals.message_handler(message, is_silenced=self._silent_messages())
 
     # silence()
     #
@@ -141,12 +142,12 @@ class Messenger:
             yield
             return
 
-        self._silence_scope_depth += 1
+        self._locals.silence_scope_depth += 1
         try:
             yield
         finally:
-            assert self._silence_scope_depth > 0
-            self._silence_scope_depth -= 1
+            assert self._locals.silence_scope_depth > 0
+            self._locals.silence_scope_depth -= 1
 
     # timed_activity()
     #
@@ -265,22 +266,21 @@ class Messenger:
     #
     @contextmanager
     def recorded_messages(self, filename, logdir):
-
         # We dont allow recursing in this context manager, and
         # we also do not allow it in the main process.
-        assert self._log_handle is None
-        assert self._log_filename is None
-        assert not utils._is_main_process()
+        assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None
+        assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None
 
         # Create the fully qualified logfile in the log directory,
         # appending the pid and .log extension at the end.
-        self._log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+        self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+        self._locals.silence_scope_depth = 0
 
         # Ensure the directory exists first
-        directory = os.path.dirname(self._log_filename)
+        directory = os.path.dirname(self._locals.log_filename)
         os.makedirs(directory, exist_ok=True)
 
-        with open(self._log_filename, "a") as logfile:
+        with open(self._locals.log_filename, "a") as logfile:
 
             # Write one last line to the log and flush it to disk
             def flush_log():
@@ -295,12 +295,12 @@ class Messenger:
                 except RuntimeError:
                     os.fsync(logfile.fileno())
 
-            self._log_handle = logfile
+            self._locals.log_handle = logfile
             with _signals.terminator(flush_log):
-                yield self._log_filename
+                yield self._locals.log_filename
 
-            self._log_handle = None
-            self._log_filename = None
+            self._locals.log_handle = None
+            self._locals.log_filename = None
 
     # get_log_handle()
     #
@@ -312,7 +312,7 @@ class Messenger:
     #     (file): The active logging file handle, or None
     #
     def get_log_handle(self):
-        return self._log_handle
+        return self._locals.log_handle
 
     # get_log_filename()
     #
@@ -324,7 +324,7 @@ class Messenger:
     #     (str): The active logging filename, or None
     #
     def get_log_filename(self):
-        return self._log_filename
+        return self._locals.log_filename
 
     # _record_message()
     #
@@ -335,7 +335,7 @@ class Messenger:
     #
     def _record_message(self, message):
 
-        if self._log_handle is None:
+        if self._locals.log_handle is None:
             return
 
         INDENT = "    "
@@ -372,8 +372,8 @@ class Messenger:
         )
 
         # Write to the open log file
-        self._log_handle.write("{}\n".format(text))
-        self._log_handle.flush()
+        self._locals.log_handle.write("{}\n".format(text))
+        self._locals.log_handle.flush()
 
     # _render_status()
     #
diff --git a/src/buildstream/_remote.py b/src/buildstream/_remote.py
index d8b8e68..6d52ff5 100644
--- a/src/buildstream/_remote.py
+++ b/src/buildstream/_remote.py
@@ -16,6 +16,7 @@
 #
 
 import os
+import threading
 from collections import namedtuple
 from urllib.parse import urlparse
 
@@ -146,41 +147,44 @@ class BaseRemote:
         self.push = spec.push
         self.url = spec.url
 
+        self._lock = threading.Lock()
+
     # init():
     #
     # Initialize the given remote. This function must be called before
     # any communication is performed, since such will otherwise fail.
     #
     def init(self):
-        if self._initialized:
-            return
-
-        # Set up the communcation channel
-        url = urlparse(self.spec.url)
-        if url.scheme == "http":
-            port = url.port or 80
-            self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port))
-        elif url.scheme == "https":
-            port = url.port or 443
-            try:
-                server_cert, client_key, client_cert = _read_files(
-                    self.spec.server_cert, self.spec.client_key, self.spec.client_cert
+        with self._lock:
+            if self._initialized:
+                return
+
+            # Set up the communcation channel
+            url = urlparse(self.spec.url)
+            if url.scheme == "http":
+                port = url.port or 80
+                self.channel = grpc.insecure_channel("{}:{}".format(url.hostname, port))
+            elif url.scheme == "https":
+                port = url.port or 443
+                try:
+                    server_cert, client_key, client_cert = _read_files(
+                        self.spec.server_cert, self.spec.client_key, self.spec.client_cert
+                    )
+                except FileNotFoundError as e:
+                    raise RemoteError("Could not read certificates: {}".format(e)) from e
+                self.server_cert = server_cert
+                self.client_key = client_key
+                self.client_cert = client_cert
+                credentials = grpc.ssl_channel_credentials(
+                    root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert
                 )
-            except FileNotFoundError as e:
-                raise RemoteError("Could not read certificates: {}".format(e)) from e
-            self.server_cert = server_cert
-            self.client_key = client_key
-            self.client_cert = client_cert
-            credentials = grpc.ssl_channel_credentials(
-                root_certificates=self.server_cert, private_key=self.client_key, certificate_chain=self.client_cert
-            )
-            self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials)
-        else:
-            raise RemoteError("Unsupported URL: {}".format(self.spec.url))
+                self.channel = grpc.secure_channel("{}:{}".format(url.hostname, port), credentials)
+            else:
+                raise RemoteError("Unsupported URL: {}".format(self.spec.url))
 
-        self._configure_protocols()
+            self._configure_protocols()
 
-        self._initialized = True
+            self._initialized = True
 
     def __enter__(self):
         return self
diff --git a/src/buildstream/_scheduler/_multiprocessing.py b/src/buildstream/_scheduler/_multiprocessing.py
deleted file mode 100644
index 4864e14..0000000
--- a/src/buildstream/_scheduler/_multiprocessing.py
+++ /dev/null
@@ -1,79 +0,0 @@
-#
-#  Copyright (C) 2019 Bloomberg Finance LP
-#
-#  This program is free software; you can redistribute it and/or
-#  modify it under the terms of the GNU Lesser General Public
-#  License as published by the Free Software Foundation; either
-#  version 2 of the License, or (at your option) any later version.
-#
-#  This library is distributed in the hope that it will be useful,
-#  but WITHOUT ANY WARRANTY; without even the implied warranty of
-#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
-#  Lesser General Public License for more details.
-#
-#  You should have received a copy of the GNU Lesser General Public
-#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
-#
-
-# TLDR:
-# ALWAYS use `.AsyncioSafeProcess` when you have an asyncio event loop running and need a `multiprocessing.Process`
-#
-#
-# The upstream asyncio library doesn't play well with forking subprocesses while an event loop is running.
-#
-# The main problem that affects us is that the parent and the child will share some file handlers.
-# The most important one for us is the sig_handler_fd, which the loop uses to buffer signals received
-# by the app so that the asyncio loop can treat them afterwards.
-#
-# This sharing means that when we send a signal to the child, the sighandler in the child will write
-# it back to the parent sig_handler_fd, making the parent have to treat it too.
-# This is a problem for example when we sigterm the process. The scheduler will send sigterms to all its children,
-# which in turn will make the scheduler receive N SIGTERMs (one per child). Which in turn will send sigterms to
-# the children...
-#
-# We therefore provide a `AsyncioSafeProcess` derived from multiprocessing.Process  that automatically
-# tries to cleanup the loop and never calls `waitpid` on the child process, which breaks our child watchers.
-#
-#
-# Relevant issues:
-#  - Asyncio: support fork (https://bugs.python.org/issue21998)
-#  - Asyncio: support multiprocessing (support fork) (https://bugs.python.org/issue22087)
-#  - Signal delivered to a subprocess triggers parent's handler (https://bugs.python.org/issue31489)
-#
-#
-
-import multiprocessing
-import signal
-import sys
-from asyncio import set_event_loop_policy
-
-
-# _AsyncioSafeForkAwareProcess()
-#
-# Process class that doesn't call waitpid on its own.
-# This prevents conflicts with the asyncio child watcher.
-#
-# Also automatically close any running asyncio loop before calling
-# the actual run target
-#
-class _AsyncioSafeForkAwareProcess(multiprocessing.Process):
-    # pylint: disable=attribute-defined-outside-init
-    def start(self):
-        self._popen = self._Popen(self)
-        self._sentinel = self._popen.sentinel
-
-    def run(self):
-        signal.set_wakeup_fd(-1)
-        set_event_loop_policy(None)
-
-        super().run()
-
-
-if sys.platform != "win32":
-    # Set the default event loop policy to automatically close our asyncio loop in child processes
-    AsyncioSafeProcess = _AsyncioSafeForkAwareProcess
-
-else:
-    # Windows doesn't support ChildWatcher that way anyways, we'll need another
-    # implementation if we want it
-    AsyncioSafeProcess = multiprocessing.Process
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 7ea87dc..a4ace41 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -24,17 +24,13 @@
 import asyncio
 import datetime
 import multiprocessing
-import os
-import signal
-import sys
 import traceback
 
 # BuildStream toplevel imports
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
 from ..._message import Message, MessageType, unconditional_messages
 from ...types import FastEnum
-from ... import _signals, utils
-from .. import _multiprocessing
+from ... import _signals
 
 
 # Return code values shutdown of job handling child processes
@@ -130,7 +126,6 @@ class Job:
         self._scheduler = scheduler  # The scheduler
         self._messenger = self._scheduler.context.messenger
         self._pipe_r = None  # The read end of a pipe for message passing
-        self._process = None  # The Process object
         self._listening = False  # Whether the parent is currently listening
         self._suspended = False  # Whether this job is currently suspended
         self._max_retries = max_retries  # Maximum number of automatic retries
@@ -143,6 +138,8 @@ class Job:
         self._message_element_key = None  # The element key for messaging
         self._element = None  # The Element() passed to the Job() constructor, if applicable
 
+        self._task = None  # The task that is run
+
     # set_name()
     #
     # Sets the name of this job
@@ -157,11 +154,13 @@ class Job:
 
         assert not self._terminated, "Attempted to start process which was already terminated"
 
+        # FIXME: remove this, this is not necessary when using asyncio
         self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
 
         self._tries += 1
         self._parent_start_listening()
 
+        # FIXME: remove the parent/child separation, it's not needed anymore.
         child_job = self.create_child_job(  # pylint: disable=assignment-from-no-return
             self.action_name,
             self._messenger,
@@ -173,26 +172,18 @@ class Job:
             self._message_element_key,
         )
 
-        self._process = _multiprocessing.AsyncioSafeProcess(target=child_job.child_action, args=[pipe_w],)
-
-        # Block signals which are handled in the main process such that
-        # the child process does not inherit the parent's state, but the main
-        # process will be notified of any signal after we launch the child.
-        #
-        with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
-            with asyncio.get_child_watcher() as watcher:
-                self._process.start()
-                # Register the process to call `_parent_child_completed` once it is done
-
-                # Close the write end of the pipe in the parent
-                pipe_w.close()
+        loop = asyncio.get_event_loop()
 
-                # Here we delay the call to the next loop tick. This is in order to be running
-                # in the main thread, as the callback itself must be thread safe.
-                def on_completion(pid, returncode):
-                    asyncio.get_event_loop().call_soon(self._parent_child_completed, pid, returncode)
+        async def execute():
+            try:
+                result = await loop.run_in_executor(None, child_job.child_action, pipe_w)
+            except asyncio.CancelledError:
+                result = _ReturnCode.TERMINATED
+            except Exception:  # pylint: disable=broad-except
+                result = _ReturnCode.FAIL
+            await self._parent_child_completed(result)
 
-                watcher.add_child_handler(self._process.pid, on_completion)
+        self._task = loop.create_task(execute())
 
     # terminate()
     #
@@ -201,18 +192,14 @@ class Job:
     # This will send a SIGTERM signal to the Job process.
     #
     def terminate(self):
-
-        # First resume the job if it's suspended
-        self.resume(silent=True)
-
         self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
         # Make sure there is no garbage on the pipe
         self._parent_stop_listening()
 
         # Terminate the process using multiprocessing API pathway
-        if self._process:
-            self._process.terminate()
+        if self._task:
+            self._task.cancel()
 
         self._terminated = True
 
@@ -226,51 +213,6 @@ class Job:
     def get_terminated(self):
         return self._terminated
 
-    # kill()
-    #
-    # Forcefully kill the process, and any children it might have.
-    #
-    def kill(self):
-        # Force kill
-        self.message(MessageType.WARN, "{} did not terminate gracefully, killing".format(self.action_name))
-        if self._process:
-            utils._kill_process_tree(self._process.pid)
-
-    # suspend()
-    #
-    # Suspend this job.
-    #
-    def suspend(self):
-        if not self._suspended:
-            self.message(MessageType.STATUS, "{} suspending".format(self.action_name))
-
-            try:
-                # Use SIGTSTP so that child processes may handle and propagate
-                # it to processes they start that become session leaders.
-                os.kill(self._process.pid, signal.SIGTSTP)
-
-                # For some reason we receive exactly one suspend event for
-                # every SIGTSTP we send to the child process, even though the
-                # child processes are setsid(). We keep a count of these so we
-                # can ignore them in our event loop suspend_event().
-                self._scheduler.internal_stops += 1
-                self._suspended = True
-            except ProcessLookupError:
-                # ignore, process has already exited
-                pass
-
-    # resume()
-    #
-    # Resume this suspended job.
-    #
-    def resume(self, silent=False):
-        if self._suspended:
-            if not silent and not self._scheduler.terminated:
-                self.message(MessageType.STATUS, "{} resuming".format(self.action_name))
-
-            os.kill(self._process.pid, signal.SIGCONT)
-            self._suspended = False
-
     # set_message_element_name()
     #
     # This is called by Job subclasses to set the plugin instance element
@@ -397,10 +339,9 @@ class Job:
     # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
     #
     # Args:
-    #    pid (int): The PID of the child which completed
     #    returncode (int): The return code of the child process
     #
-    def _parent_child_completed(self, pid, returncode):
+    async def _parent_child_completed(self, returncode):
         self._parent_shutdown()
 
         try:
@@ -431,16 +372,16 @@ class Job:
             status = JobStatus.FAIL
         elif returncode == _ReturnCode.TERMINATED:
             if self._terminated:
-                self.message(MessageType.INFO, "Process was terminated")
+                self.message(MessageType.INFO, "Job terminated")
             else:
-                self.message(MessageType.ERROR, "Process was terminated unexpectedly")
+                self.message(MessageType.ERROR, "Job was terminated unexpectedly")
 
             status = JobStatus.FAIL
         elif returncode == _ReturnCode.KILLED:
             if self._terminated:
-                self.message(MessageType.INFO, "Process was killed")
+                self.message(MessageType.INFO, "Job was killed")
             else:
-                self.message(MessageType.ERROR, "Process was killed unexpectedly")
+                self.message(MessageType.ERROR, "Job was killed unexpectedly")
 
             status = JobStatus.FAIL
         else:
@@ -451,7 +392,7 @@ class Job:
 
         # Force the deletion of the pipe and process objects to try and clean up FDs
         self._pipe_r.close()
-        self._pipe_r = self._process = None
+        self._pipe_r = self._task = None
 
     # _parent_process_envelope()
     #
@@ -655,19 +596,6 @@ class ChildJob:
     #    pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
     #
     def child_action(self, pipe_w):
-
-        # This avoids some SIGTSTP signals from grandchildren
-        # getting propagated up to the master process
-        os.setsid()
-
-        # First set back to the default signal handlers for the signals
-        # we handle, and then clear their blocked state.
-        #
-        signal_list = [signal.SIGTSTP, signal.SIGTERM]
-        for sig in signal_list:
-            signal.signal(sig, signal.SIG_DFL)
-        signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
-
         # Assign the pipe we passed across the process boundaries
         #
         # Set the global message handler in this child
@@ -687,16 +615,11 @@ class ChildJob:
             nonlocal starttime
             starttime += datetime.datetime.now() - stopped_time
 
-        # Graciously handle sigterms.
-        def handle_sigterm():
-            self._child_shutdown(_ReturnCode.TERMINATED)
-
         # Time, log and and run the action function
         #
-        with _signals.terminator(handle_sigterm), _signals.suspendable(
-            stop_time, resume_time
-        ), self._messenger.recorded_messages(self._logfile, self._logdir) as filename:
-
+        with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages(
+            self._logfile, self._logdir
+        ) as filename:
             self.message(MessageType.START, self.action_name, logfile=filename)
 
             try:
@@ -707,7 +630,7 @@ class ChildJob:
                 self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
 
                 # Alert parent of skip by return code
-                self._child_shutdown(_ReturnCode.SKIPPED)
+                return _ReturnCode.SKIPPED
             except BstError as e:
                 elapsed = datetime.datetime.now() - starttime
                 retry_flag = e.temporary
@@ -731,7 +654,7 @@ class ChildJob:
 
                 # Set return code based on whether or not the error was temporary.
                 #
-                self._child_shutdown(_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL)
+                return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
 
             except Exception:  # pylint: disable=broad-except
 
@@ -744,7 +667,7 @@ class ChildJob:
 
                 self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
                 # Unhandled exceptions should permenantly fail
-                self._child_shutdown(_ReturnCode.PERM_FAIL)
+                return _ReturnCode.PERM_FAIL
 
             else:
                 # No exception occurred in the action
@@ -757,7 +680,9 @@ class ChildJob:
                 # Shutdown needs to stay outside of the above context manager,
                 # make sure we dont try to handle SIGTERM while the process
                 # is already busy in sys.exit()
-                self._child_shutdown(_ReturnCode.OK)
+                return _ReturnCode.OK
+            finally:
+                self._pipe_w.close()
 
     #######################################################
     #                  Local Private Methods              #
@@ -809,18 +734,6 @@ class ChildJob:
         if result is not None:
             self._send_message(_MessageType.RESULT, result)
 
-    # _child_shutdown()
-    #
-    # Shuts down the child process by cleaning up and exiting the process
-    #
-    # Args:
-    #    exit_code (_ReturnCode): The exit code to exit with
-    #
-    def _child_shutdown(self, exit_code):
-        self._pipe_w.close()
-        assert isinstance(exit_code, _ReturnCode)
-        sys.exit(exit_code.value)
-
     # _child_message_handler()
     #
     # A Context delegate for handling messages, this replaces the
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 3e6bf1f..7380f07 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -20,12 +20,14 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 
 # System imports
+import functools
 import os
 import asyncio
 from itertools import chain
 import signal
 import datetime
 import sys
+from concurrent.futures import ThreadPoolExecutor
 
 # Local imports
 from .resources import Resources
@@ -34,9 +36,7 @@ from ..types import FastEnum
 from .._profile import Topics, PROFILER
 from .._message import Message, MessageType
 from ..plugin import Plugin
-
-
-_MAX_TIMEOUT_TO_KILL_CHILDREN = 20  # in seconds
+from .. import _signals
 
 
 # A decent return code for Scheduler.run()
@@ -46,6 +46,23 @@ class SchedStatus(FastEnum):
     TERMINATED = 1
 
 
+def reset_signals_on_exit(func):
+    @functools.wraps(func)
+    def wrapper(*args, **kwargs):
+        orig_sigint = signal.getsignal(signal.SIGINT)
+        orig_sigterm = signal.getsignal(signal.SIGTERM)
+        orig_sigtstp = signal.getsignal(signal.SIGTSTP)
+
+        try:
+            return func(*args, **kwargs)
+        finally:
+            signal.signal(signal.SIGINT, orig_sigint)
+            signal.signal(signal.SIGTERM, orig_sigterm)
+            signal.signal(signal.SIGTSTP, orig_sigtstp)
+
+    return wrapper
+
+
 # Scheduler()
 #
 # The scheduler operates on a list queues, each of which is meant to accomplish
@@ -79,7 +96,6 @@ class Scheduler:
 
         # These are shared with the Job, but should probably be removed or made private in some way.
         self.loop = None  # Shared for Job access to observe the message queue
-        self.internal_stops = 0  # Amount of SIGSTP signals we've introduced, this is shared with job.py
 
         #
         # Private members
@@ -113,6 +129,7 @@ class Scheduler:
     # elements have been processed by each queue or when
     # an error arises
     #
+    @reset_signals_on_exit
     def run(self, queues, casd_process_manager):
 
         # Hold on to the queues to process
@@ -149,10 +166,14 @@ class Scheduler:
 
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
-            # Run the queues
-            self._sched()
-            self.loop.run_forever()
-            self.loop.close()
+            # FIXME: this should be done in a cleaner way
+            with _signals.suspendable(lambda: None, lambda: None), _signals.terminator(lambda: None):
+                with ThreadPoolExecutor(max_workers=sum(self.resources._max_resources.values())) as pool:
+                    self.loop.set_default_executor(pool)
+                    # Run the queues
+                    self._sched()
+                    self.loop.run_forever()
+                    self.loop.close()
 
         # Stop watching casd
         _watcher.remove_child_handler(self._casd_process.pid)
@@ -348,13 +369,6 @@ class Scheduler:
             # If that happens, do another round.
             process_queues = any(q.dequeue_ready() for q in self.queues)
 
-        # Make sure fork is allowed before starting jobs
-        if not self.context.prepare_fork():
-            message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active")
-            self.context.messenger.message(message)
-            self.terminate()
-            return
-
         # Start the jobs
         #
         for job in ready:
@@ -412,9 +426,9 @@ class Scheduler:
         if not self.suspended:
             self._suspendtime = datetime.datetime.now()
             self.suspended = True
-            # Notify that we're suspended
-            for job in self._active_jobs:
-                job.suspend()
+
+            for suspender in reversed(_signals.suspendable_stack):
+                suspender.suspend()
 
     # _resume_jobs()
     #
@@ -422,8 +436,9 @@ class Scheduler:
     #
     def _resume_jobs(self):
         if self.suspended:
-            for job in self._active_jobs:
-                job.resume()
+            for suspender in _signals.suspendable_stack:
+                suspender.resume()
+
             self.suspended = False
             # Notify that we're unsuspended
             self._state.offset_start_time(datetime.datetime.now() - self._suspendtime)
@@ -455,12 +470,6 @@ class Scheduler:
     # A loop registered event callback for SIGTSTP
     #
     def _suspend_event(self):
-
-        # Ignore the feedback signals from Job.suspend()
-        if self.internal_stops:
-            self.internal_stops -= 1
-            return
-
         # No need to care if jobs were suspended or not, we _only_ handle this
         # while we know jobs are not suspended.
         self._suspend_jobs()
@@ -482,12 +491,8 @@ class Scheduler:
         self.loop.remove_signal_handler(signal.SIGTERM)
 
     def _terminate_jobs_real(self):
-        def kill_jobs():
-            for job_ in self._active_jobs:
-                job_.kill()
-
-        # Schedule all jobs to be killed if they have not exited after timeout
-        self.loop.call_later(_MAX_TIMEOUT_TO_KILL_CHILDREN, kill_jobs)
+        for terminator in _signals.terminator_stack.copy():
+            terminator()
 
         for job in self._active_jobs:
             job.terminate()
diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py
index 7c2da1c..b9ca919 100644
--- a/src/buildstream/downloadablefilesource.py
+++ b/src/buildstream/downloadablefilesource.py
@@ -259,6 +259,11 @@ class DownloadableFileSource(Source):
 
         return self.__default_mirror_file
 
+    @classmethod
+    def _reset_url_opener(cls):
+        # Needed for tests, in order to cleanup the `netrc` configuration.
+        cls.__urlopener = None
+
     def __get_urlopener(self):
         if not DownloadableFileSource.__urlopener:
             try:
diff --git a/src/buildstream/source.py b/src/buildstream/source.py
index ea77a65..23c7a2f 100644
--- a/src/buildstream/source.py
+++ b/src/buildstream/source.py
@@ -986,19 +986,11 @@ class Source(Plugin):
         clean = node.strip_node_info()
         to_modify = node.strip_node_info()
 
-        current_ref = self.get_ref()  # pylint: disable=assignment-from-no-return
-
         # Set the ref regardless of whether it changed, the
         # TrackQueue() will want to update a specific node with
         # the ref, regardless of whether the original has changed.
         self.set_ref(new_ref, to_modify)
 
-        if current_ref == new_ref or not save:
-            # Note: We do not look for and propagate changes at this point
-            # which might result in desync depending if something changes about
-            # tracking in the future.  For now, this is quite safe.
-            return False
-
         actions = {}
         for k, v in clean.items():
             if k not in to_modify:
diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py
index 5d1c1d2..520f685 100644
--- a/src/buildstream/testing/_fixtures.py
+++ b/src/buildstream/testing/_fixtures.py
@@ -42,3 +42,4 @@ def thread_check(default_thread_number):
 @pytest.fixture(autouse=True)
 def reset_global_node_state():
     node._reset_global_state()
+    DownloadableFileSource._reset_url_opener()
diff --git a/tests/internals/cascache.py b/tests/internals/cascache.py
index 043531c..e27e409 100644
--- a/tests/internals/cascache.py
+++ b/tests/internals/cascache.py
@@ -3,6 +3,7 @@ import time
 from unittest.mock import MagicMock
 
 from buildstream._cas.cascache import CASCache
+from buildstream._cas import casdprocessmanager
 from buildstream._message import MessageType
 from buildstream._messenger import Messenger
 
@@ -31,6 +32,10 @@ def test_report_when_cascache_exits_not_cleanly(tmp_path, monkeypatch):
     dummy_buildbox_casd.write_text("#!/usr/bin/env sh\nwhile :\ndo\nsleep 60\ndone")
     dummy_buildbox_casd.chmod(0o777)
     monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep)
+    # FIXME: this is a hack, we should instead have a socket be created nicely
+    #        on the fake casd script. This whole test suite probably would
+    #        need some cleanup
+    monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1)
 
     messenger = MagicMock(spec_set=Messenger)
     cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs")))
@@ -50,6 +55,10 @@ def test_report_when_cascache_is_forcefully_killed(tmp_path, monkeypatch):
     dummy_buildbox_casd.write_text("#!/usr/bin/env sh\ntrap 'echo hello' TERM\nwhile :\ndo\nsleep 60\ndone")
     dummy_buildbox_casd.chmod(0o777)
     monkeypatch.setenv("PATH", str(tmp_path), prepend=os.pathsep)
+    # FIXME: this is a hack, we should instead have a socket be created nicely
+    #        on the fake casd script. This whole test suite probably would
+    #        need some cleanup
+    monkeypatch.setattr(casdprocessmanager, "_CASD_TIMEOUT", 0.1)
 
     messenger = MagicMock(spec_set=Messenger)
     cache = CASCache(str(tmp_path.joinpath("casd")), casd=True, log_directory=str(tmp_path.joinpath("logs")))


[buildstream] 01/13: _signals.py: allow calling signal handler from non-main threads

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 8d91b9739417b00de6525e7c0c49f745880c5c63
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Wed Jun 17 20:57:37 2020 +0000

    _signals.py: allow calling signal handler from non-main threads
    
    This modifies the signal terminator so that it can be called from any
    thread.
    
    This checks that either:
    
    - The signal handler is already in place
    - Or the caller is in the main thread, allowing to set the signal
      handler.
    
    This also removes the exact callback that was added instead of removing
    the last one, and fixes the `suspend_handler` to do the same.
    
    This is required, as we don't know which interleaving of calls will be
    done, and we can't guarantee that the last one is the right one to
    remove
---
 src/buildstream/_cas/casserver.py |  7 ++++++-
 src/buildstream/_signals.py       | 13 ++++++-------
 2 files changed, 12 insertions(+), 8 deletions(-)

diff --git a/src/buildstream/_cas/casserver.py b/src/buildstream/_cas/casserver.py
index 013fb07..04c5eb8 100644
--- a/src/buildstream/_cas/casserver.py
+++ b/src/buildstream/_cas/casserver.py
@@ -30,6 +30,7 @@ import grpc
 import click
 
 from .._protos.build.bazel.remote.asset.v1 import remote_asset_pb2_grpc
+from .. import _signals
 from .._protos.build.bazel.remote.execution.v2 import (
     remote_execution_pb2,
     remote_execution_pb2_grpc,
@@ -137,7 +138,11 @@ def create_server(repo, *, enable_push, quota, index_only, log_level=LogLevel.Le
             _ReferenceStorageServicer(casd_channel, root, enable_push=enable_push), server
         )
 
-        yield server
+        # Ensure we have the signal handler set for SIGTERM
+        # This allows threads from GRPC to call our methods that do register
+        # handlers at exit.
+        with _signals.terminator(lambda: None):
+            yield server
 
     finally:
         casd_channel.close()
diff --git a/src/buildstream/_signals.py b/src/buildstream/_signals.py
index 03b55b0..1edd445 100644
--- a/src/buildstream/_signals.py
+++ b/src/buildstream/_signals.py
@@ -80,13 +80,10 @@ def terminator_handler(signal_, frame):
 def terminator(terminate_func):
     global terminator_stack  # pylint: disable=global-statement
 
-    # Signal handling only works in the main thread
-    if threading.current_thread() != threading.main_thread():
-        yield
-        return
-
     outermost = bool(not terminator_stack)
 
+    assert threading.current_thread() == threading.main_thread() or not outermost
+
     terminator_stack.append(terminate_func)
     if outermost:
         original_handler = signal.signal(signal.SIGTERM, terminator_handler)
@@ -96,7 +93,7 @@ def terminator(terminate_func):
     finally:
         if outermost:
             signal.signal(signal.SIGTERM, original_handler)
-        terminator_stack.pop()
+        terminator_stack.remove(terminate_func)
 
 
 # Just a simple object for holding on to two callbacks
@@ -146,6 +143,8 @@ def suspendable(suspend_callback, resume_callback):
     global suspendable_stack  # pylint: disable=global-statement
 
     outermost = bool(not suspendable_stack)
+    assert threading.current_thread() == threading.main_thread() or not outermost
+
     suspender = Suspender(suspend_callback, resume_callback)
     suspendable_stack.append(suspender)
 
@@ -158,7 +157,7 @@ def suspendable(suspend_callback, resume_callback):
         if outermost:
             signal.signal(signal.SIGTSTP, original_stop)
 
-        suspendable_stack.pop()
+        suspendable_stack.remove(suspender)
 
 
 # blocked()


[buildstream] 04/13: _messenger.py: Make `timed_suspendable` public and use it in job.py

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 349f97d53817a992a5bc744b29a9012f9f2b7b77
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Sat Jul 4 12:13:24 2020 +0000

    _messenger.py: Make `timed_suspendable` public and use it in job.py
    
    This reduces the amount of code duplication
---
 src/buildstream/_messenger.py          | 60 +++++++++++++++++-----------------
 src/buildstream/_scheduler/jobs/job.py | 23 +++----------
 2 files changed, 35 insertions(+), 48 deletions(-)

diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 222b05d..84bea6a 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -161,7 +161,7 @@ class Messenger:
     #
     @contextmanager
     def timed_activity(self, activity_name, *, element_name=None, detail=None, silent_nested=False):
-        with self._timed_suspendable() as timedata:
+        with self.timed_suspendable() as timedata:
             try:
                 # Push activity depth for status messages
                 message = Message(MessageType.START, activity_name, detail=detail, element_name=element_name)
@@ -205,7 +205,7 @@ class Messenger:
         if not full_name:
             full_name = activity_name
 
-        with self._timed_suspendable() as timedata:
+        with self.timed_suspendable() as timedata:
             try:
                 message = Message(MessageType.START, activity_name, element_name=element_name)
                 self.message(message)
@@ -326,6 +326,34 @@ class Messenger:
     def get_log_filename(self):
         return self._locals.log_filename
 
+    # timed_suspendable()
+    #
+    # A contextmanager that allows an activity to be suspended and can
+    # adjust for clock drift caused by suspending
+    #
+    # Yields:
+    #    TimeData: An object that contains the time the activity started
+    #
+    @contextmanager
+    def timed_suspendable(self):
+        # Note: timedata needs to be in a namedtuple so that values can be
+        # yielded that will change
+        timedata = _TimeData(start_time=datetime.datetime.now())
+        stopped_time = None
+
+        def stop_time():
+            nonlocal stopped_time
+            stopped_time = datetime.datetime.now()
+
+        def resume_time():
+            nonlocal timedata
+            nonlocal stopped_time
+            sleep_time = datetime.datetime.now() - stopped_time
+            timedata.start_time += sleep_time
+
+        with _signals.suspendable(stop_time, resume_time):
+            yield timedata
+
     # _record_message()
     #
     # Records the message if recording is enabled
@@ -388,31 +416,3 @@ class Messenger:
         if self._render_status_cb and now >= self._next_render:
             self._render_status_cb()
             self._next_render = now + _RENDER_INTERVAL
-
-    # _timed_suspendable()
-    #
-    # A contextmanager that allows an activity to be suspended and can
-    # adjust for clock drift caused by suspending
-    #
-    # Yields:
-    #    TimeData: An object that contains the time the activity started
-    #
-    @contextmanager
-    def _timed_suspendable(self):
-        # Note: timedata needs to be in a namedtuple so that values can be
-        # yielded that will change
-        timedata = _TimeData(start_time=datetime.datetime.now())
-        stopped_time = None
-
-        def stop_time():
-            nonlocal stopped_time
-            stopped_time = datetime.datetime.now()
-
-        def resume_time():
-            nonlocal timedata
-            nonlocal stopped_time
-            sleep_time = datetime.datetime.now() - stopped_time
-            timedata.start_time += sleep_time
-
-        with _signals.suspendable(stop_time, resume_time):
-            yield timedata
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index a4ace41..03a6b61 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -30,7 +30,6 @@ import traceback
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
 from ..._message import Message, MessageType, unconditional_messages
 from ...types import FastEnum
-from ... import _signals
 
 
 # Return code values shutdown of job handling child processes
@@ -603,21 +602,9 @@ class ChildJob:
         self._pipe_w = pipe_w
         self._messenger.set_message_handler(self._child_message_handler)
 
-        starttime = datetime.datetime.now()
-        stopped_time = None
-
-        def stop_time():
-            nonlocal stopped_time
-            stopped_time = datetime.datetime.now()
-
-        def resume_time():
-            nonlocal stopped_time
-            nonlocal starttime
-            starttime += datetime.datetime.now() - stopped_time
-
         # Time, log and and run the action function
         #
-        with _signals.suspendable(stop_time, resume_time), self._messenger.recorded_messages(
+        with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages(
             self._logfile, self._logdir
         ) as filename:
             self.message(MessageType.START, self.action_name, logfile=filename)
@@ -626,13 +613,13 @@ class ChildJob:
                 # Try the task action
                 result = self.child_process()  # pylint: disable=assignment-from-no-return
             except SkipJob as e:
-                elapsed = datetime.datetime.now() - starttime
+                elapsed = datetime.datetime.now() - timeinfo.start_time
                 self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
 
                 # Alert parent of skip by return code
                 return _ReturnCode.SKIPPED
             except BstError as e:
-                elapsed = datetime.datetime.now() - starttime
+                elapsed = datetime.datetime.now() - timeinfo.start_time
                 retry_flag = e.temporary
 
                 if retry_flag and (self._tries <= self._max_retries):
@@ -662,7 +649,7 @@ class ChildJob:
                 # send the traceback and formatted exception back to the frontend
                 # and print it to the log file.
                 #
-                elapsed = datetime.datetime.now() - starttime
+                elapsed = datetime.datetime.now() - timeinfo.start_time
                 detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
 
                 self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
@@ -674,7 +661,7 @@ class ChildJob:
                 self._send_message(_MessageType.CHILD_DATA, self.child_process_data())
                 self._child_send_result(result)
 
-                elapsed = datetime.datetime.now() - starttime
+                elapsed = datetime.datetime.now() - timeinfo.start_time
                 self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
 
                 # Shutdown needs to stay outside of the above context manager,


[buildstream] 09/13: job.py: Dont' pass the errors through the queue, we can set it directly

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 8b72daf0e0712a876d977d8546ca04d856218e17
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Wed Jul 8 10:05:54 2020 +0000

    job.py: Dont' pass the errors through the queue, we can set it directly
    
    We are now running in threads so we can set the global ourselves without
    having to go through all the current hoops
---
 src/buildstream/_scheduler/jobs/job.py | 28 ++--------------------------
 1 file changed, 2 insertions(+), 26 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 30308a9..5369ae3 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -67,7 +67,6 @@ class _Envelope:
 
 
 class _MessageType(FastEnum):
-    ERROR = 2
     RESULT = 3
 
 
@@ -387,12 +386,6 @@ class Job:
     def _parent_process_envelope(self, envelope):
         if not self._listening:
             return
-
-        if envelope.message_type is _MessageType.ERROR:
-            # For regression tests only, save the last error domain / reason
-            # reported from a child task in the main process, this global state
-            # is currently managed in _exceptions.py
-            set_last_task_error(envelope.message["domain"], envelope.message["reason"])
         elif envelope.message_type is _MessageType.RESULT:
             assert self._result is None
             self._result = envelope.message
@@ -566,8 +559,8 @@ class ChildJob:
                 else:
                     self.message(MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, sandbox=e.sandbox)
 
-                # Report the exception to the parent (for internal testing purposes)
-                self._child_send_error(e)
+                # Register the error for internal testing purposes
+                set_last_task_error(e.domain, e.reason)
 
                 # Set return code based on whether or not the error was temporary.
                 #
@@ -617,23 +610,6 @@ class ChildJob:
     def _send_message(self, message_type, message_data):
         self._pipe_w.send(_Envelope(message_type, message_data))
 
-    # _child_send_error()
-    #
-    # Sends an error to the main process through the message pipe
-    #
-    # Args:
-    #    e (Exception): The error to send
-    #
-    def _child_send_error(self, e):
-        domain = None
-        reason = None
-
-        if isinstance(e, BstError):
-            domain = e.domain
-            reason = e.reason
-
-        self._send_message(_MessageType.ERROR, {"domain": domain, "reason": reason})
-
     # _child_send_result()
     #
     # Sends the serialized result to the main process through the message pipe


[buildstream] 10/13: job.py: Completely remove the need for a queue between parent and child jobs

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 552a40b7c5179e686bb55403920719c6bf6481da
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Wed Jul 8 12:02:28 2020 +0000

    job.py: Completely remove the need for a queue between parent and child jobs
    
    We don't need that distinction anymore
---
 src/buildstream/_scheduler/jobs/job.py | 167 +++------------------------------
 1 file changed, 11 insertions(+), 156 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 5369ae3..d461d0f 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -23,7 +23,6 @@
 # System imports
 import asyncio
 import datetime
-import multiprocessing
 import traceback
 
 # BuildStream toplevel imports
@@ -59,17 +58,6 @@ class JobStatus(FastEnum):
     SKIPPED = 3
 
 
-# Used to distinguish between status messages and return values
-class _Envelope:
-    def __init__(self, message_type, message):
-        self.message_type = message_type
-        self.message = message
-
-
-class _MessageType(FastEnum):
-    RESULT = 3
-
-
 # Job()
 #
 # The Job object represents a task that will run in parallel to the main
@@ -119,9 +107,6 @@ class Job:
         #
         self._scheduler = scheduler  # The scheduler
         self._messenger = self._scheduler.context.messenger
-        self._pipe_r = None  # The read end of a pipe for message passing
-        self._listening = False  # Whether the parent is currently listening
-        self._suspended = False  # Whether this job is currently suspended
         self._max_retries = max_retries  # Maximum number of automatic retries
         self._result = None  # Return value of child action in the parent
         self._tries = 0  # Try count, for retryable jobs
@@ -148,11 +133,7 @@ class Job:
 
         assert not self._terminated, "Attempted to start process which was already terminated"
 
-        # FIXME: remove this, this is not necessary when using asyncio
-        self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
-
         self._tries += 1
-        self._parent_start_listening()
 
         # FIXME: remove the parent/child separation, it's not needed anymore.
         child_job = self.create_child_job(  # pylint: disable=assignment-from-no-return
@@ -170,12 +151,12 @@ class Job:
 
         async def execute():
             try:
-                result = await loop.run_in_executor(None, child_job.child_action, pipe_w)
+                ret_code, self._result = await loop.run_in_executor(None, child_job.child_action)
             except asyncio.CancelledError:
-                result = _ReturnCode.TERMINATED
+                ret_code = _ReturnCode.TERMINATED
             except Exception:  # pylint: disable=broad-except
-                result = _ReturnCode.FAIL
-            await self._parent_child_completed(result)
+                ret_code = _ReturnCode.FAIL
+            await self._parent_child_completed(ret_code)
 
         self._task = loop.create_task(execute())
 
@@ -188,9 +169,6 @@ class Job:
     def terminate(self):
         self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
-        # Make sure there is no garbage on the pipe
-        self._parent_stop_listening()
-
         # Terminate the process using multiprocessing API pathway
         if self._task:
             self._task.cancel()
@@ -302,16 +280,6 @@ class Job:
     #                  Local Private Methods              #
     #######################################################
 
-    # _parent_shutdown()
-    #
-    # Shuts down the Job on the parent side by reading any remaining
-    # messages on the message pipe and cleaning up any resources.
-    #
-    def _parent_shutdown(self):
-        # Make sure we've read everything we need and then stop listening
-        self._parent_process_pipe()
-        self._parent_stop_listening()
-
     # _parent_child_completed()
     #
     # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
@@ -320,8 +288,6 @@ class Job:
     #    returncode (int): The return code of the child process
     #
     async def _parent_child_completed(self, returncode):
-        self._parent_shutdown()
-
         try:
             returncode = _ReturnCode(returncode)
         except ValueError:
@@ -368,69 +334,7 @@ class Job:
         self.parent_complete(status, self._result)
         self._scheduler.job_completed(self, status)
 
-        # Force the deletion of the pipe and process objects to try and clean up FDs
-        self._pipe_r.close()
-        self._pipe_r = self._task = None
-
-    # _parent_process_envelope()
-    #
-    # Processes a message Envelope deserialized form the message pipe.
-    #
-    # this will have the side effect of assigning some local state
-    # on the Job in the parent process for later inspection when the
-    # child process completes.
-    #
-    # Args:
-    #    envelope (Envelope): The message envelope
-    #
-    def _parent_process_envelope(self, envelope):
-        if not self._listening:
-            return
-        elif envelope.message_type is _MessageType.RESULT:
-            assert self._result is None
-            self._result = envelope.message
-        else:
-            assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
-
-    # _parent_process_pipe()
-    #
-    # Reads back message envelopes from the message pipe
-    # in the parent process.
-    #
-    def _parent_process_pipe(self):
-        while self._pipe_r.poll():
-            try:
-                envelope = self._pipe_r.recv()
-            except EOFError:
-                self._parent_stop_listening()
-                break
-            self._parent_process_envelope(envelope)
-
-    # _parent_recv()
-    #
-    # A callback to handle I/O events from the message
-    # pipe file descriptor in the main process message loop
-    #
-    def _parent_recv(self, *args):
-        self._parent_process_pipe()
-
-    # _parent_start_listening()
-    #
-    # Starts listening on the message pipe
-    #
-    def _parent_start_listening(self):
-        if not self._listening:
-            self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
-            self._listening = True
-
-    # _parent_stop_listening()
-    #
-    # Stops listening on the message pipe
-    #
-    def _parent_stop_listening(self):
-        if self._listening:
-            self._scheduler.loop.remove_reader(self._pipe_r.fileno())
-            self._listening = False
+        self._task = None
 
 
 # ChildJob()
@@ -471,8 +375,6 @@ class ChildJob:
         self._message_element_name = message_element_name
         self._message_element_key = message_element_key
 
-        self._pipe_w = None  # The write end of a pipe for message passing
-
     # message():
     #
     # Logs a message, this will be logged in the task's logfile and
@@ -522,16 +424,7 @@ class ChildJob:
     #
     # Perform the action in the child process, this calls the action_cb.
     #
-    # Args:
-    #    pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
-    #
-    def child_action(self, pipe_w):
-        # Assign the pipe we passed across the process boundaries
-        #
-        # Set the global message handler in this child
-        # process to forward messages to the parent process
-        self._pipe_w = pipe_w
-
+    def child_action(self):
         # Time, log and and run the action function
         #
         with self._messenger.timed_suspendable() as timeinfo, self._messenger.record_job(
@@ -547,7 +440,7 @@ class ChildJob:
                 self.message(MessageType.SKIPPED, str(e), elapsed=elapsed)
 
                 # Alert parent of skip by return code
-                return _ReturnCode.SKIPPED
+                return _ReturnCode.SKIPPED, None
             except BstError as e:
                 elapsed = datetime.datetime.now() - timeinfo.start_time
                 retry_flag = e.temporary
@@ -564,11 +457,11 @@ class ChildJob:
 
                 # Set return code based on whether or not the error was temporary.
                 #
-                return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
+                return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL, None
 
             except Exception:  # pylint: disable=broad-except
 
-                # If an unhandled (not normalized to BstError) occurs, that's a bug,
+                # If an unhandled (not normalized to BstError) occurs, that's a `bug`,
                 # send the traceback and formatted exception back to the frontend
                 # and print it to the log file.
                 #
@@ -577,51 +470,13 @@ class ChildJob:
 
                 self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail)
                 # Unhandled exceptions should permenantly fail
-                return _ReturnCode.PERM_FAIL
+                return _ReturnCode.PERM_FAIL, None
 
             else:
-                # No exception occurred in the action
-                self._child_send_result(result)
-
                 elapsed = datetime.datetime.now() - timeinfo.start_time
                 self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed)
 
                 # Shutdown needs to stay outside of the above context manager,
                 # make sure we dont try to handle SIGTERM while the process
                 # is already busy in sys.exit()
-                return _ReturnCode.OK
-            finally:
-                self._pipe_w.close()
-
-    #######################################################
-    #                  Local Private Methods              #
-    #######################################################
-
-    # _send_message()
-    #
-    # Send data in a message to the parent Job, running in the main process.
-    #
-    # Args:
-    #    message_type (str): The type of message to send.
-    #    message_data (any): A simple object (must be pickle-able, i.e.
-    #                        strings, lists, dicts, numbers, but not Element
-    #                        instances). This is sent to the parent Job.
-    #
-    def _send_message(self, message_type, message_data):
-        self._pipe_w.send(_Envelope(message_type, message_data))
-
-    # _child_send_result()
-    #
-    # Sends the serialized result to the main process through the message pipe
-    #
-    # Args:
-    #    result (any): None, or a simple object (must be pickle-able, i.e.
-    #                  strings, lists, dicts, numbers, but not Element
-    #                  instances).
-    #
-    # Note: If None is passed here, nothing needs to be sent, the
-    #       result member in the parent process will simply remain None.
-    #
-    def _child_send_result(self, result):
-        if result is not None:
-            self._send_message(_MessageType.RESULT, result)
+                return _ReturnCode.OK, result


[buildstream] 12/13: plugin.py: Add a helper to run blocking processes in subprocesses

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit d211abc07f5e56ad1f9e4a6a6b8804e67380e4fe
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Thu Jul 9 19:02:20 2020 +0100

    plugin.py: Add a helper to run blocking processes in subprocesses
    
    This ensures that we can cleanly cleanup processes and threads on
    termination of BuildStream.
    
    Plugins should use this helper whenever there is a risk of them being
    blocked on a syscall for an indefinite amount of time
---
 src/buildstream/downloadablefilesource.py | 97 +++++++++++++++++--------------
 src/buildstream/plugin.py                 | 28 ++++++++-
 2 files changed, 80 insertions(+), 45 deletions(-)

diff --git a/src/buildstream/downloadablefilesource.py b/src/buildstream/downloadablefilesource.py
index b9ca919..4875445 100644
--- a/src/buildstream/downloadablefilesource.py
+++ b/src/buildstream/downloadablefilesource.py
@@ -99,6 +99,34 @@ class _NetrcPasswordManager:
             return login, password
 
 
+def _download_file(opener, url, etag, directory):
+    default_name = os.path.basename(url)
+    request = urllib.request.Request(url)
+    request.add_header("Accept", "*/*")
+    request.add_header("User-Agent", "BuildStream/2")
+
+    if etag is not None:
+        request.add_header("If-None-Match", etag)
+
+    with contextlib.closing(opener.open(request)) as response:
+        info = response.info()
+
+        # some servers don't honor the 'If-None-Match' header
+        if etag and info["ETag"] == etag:
+            return None, None
+
+        etag = info["ETag"]
+
+        filename = info.get_filename(default_name)
+        filename = os.path.basename(filename)
+        local_file = os.path.join(directory, filename)
+        with open(local_file, "wb") as dest:
+            shutil.copyfileobj(response, dest)
+
+    return local_file, etag
+
+
+
 class DownloadableFileSource(Source):
     # pylint: disable=attribute-defined-outside-init
 
@@ -137,19 +165,18 @@ class DownloadableFileSource(Source):
         # there is no 'track' field in the source to determine what/whether
         # or not to update refs, because tracking a ref is always a conscious
         # decision by the user.
-        with self.timed_activity("Tracking {}".format(self.url), silent_nested=True):
-            new_ref = self._ensure_mirror()
+        new_ref = self._ensure_mirror("Tracking {}".format(self.url))
 
-            if self.ref and self.ref != new_ref:
-                detail = (
-                    "When tracking, new ref differs from current ref:\n"
-                    + "  Tracked URL: {}\n".format(self.url)
-                    + "  Current ref: {}\n".format(self.ref)
-                    + "  New ref: {}\n".format(new_ref)
-                )
-                self.warn("Potential man-in-the-middle attack!", detail=detail)
+        if self.ref and self.ref != new_ref:
+            detail = (
+                "When tracking, new ref differs from current ref:\n"
+                + "  Tracked URL: {}\n".format(self.url)
+                + "  Current ref: {}\n".format(self.ref)
+                + "  New ref: {}\n".format(new_ref)
+            )
+            self.warn("Potential man-in-the-middle attack!", detail=detail)
 
-            return new_ref
+        return new_ref
 
     def fetch(self):  # pylint: disable=arguments-differ
 
@@ -162,12 +189,11 @@ class DownloadableFileSource(Source):
 
         # Download the file, raise hell if the sha256sums don't match,
         # and mirror the file otherwise.
-        with self.timed_activity("Fetching {}".format(self.url), silent_nested=True):
-            sha256 = self._ensure_mirror()
-            if sha256 != self.ref:
-                raise SourceError(
-                    "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref)
-                )
+        sha256 = self._ensure_mirror("Fetching {}".format(self.url),)
+        if sha256 != self.ref:
+            raise SourceError(
+                "File downloaded from {} has sha256sum '{}', not '{}'!".format(self.url, sha256, self.ref)
+            )
 
     def _warn_deprecated_etag(self, node):
         etag = node.get_str("etag", None)
@@ -188,40 +214,23 @@ class DownloadableFileSource(Source):
         with utils.save_file_atomic(etagfilename) as etagfile:
             etagfile.write(etag)
 
-    def _ensure_mirror(self):
+    def _ensure_mirror(self, activity_name: str):
         # Downloads from the url and caches it according to its sha256sum.
         try:
             with self.tempdir() as td:
-                default_name = os.path.basename(self.url)
-                request = urllib.request.Request(self.url)
-                request.add_header("Accept", "*/*")
-                request.add_header("User-Agent", "BuildStream/2")
-
                 # We do not use etag in case what we have in cache is
                 # not matching ref in order to be able to recover from
                 # corrupted download.
-                if self.ref:
-                    etag = self._get_etag(self.ref)
-
+                if self.ref and not self.is_cached():
                     # Do not re-download the file if the ETag matches.
-                    if etag and self.is_cached():
-                        request.add_header("If-None-Match", etag)
-
-                opener = self.__get_urlopener()
-                with contextlib.closing(opener.open(request)) as response:
-                    info = response.info()
-
-                    # some servers don't honor the 'If-None-Match' header
-                    if self.ref and etag and info["ETag"] == etag:
-                        return self.ref
+                    etag = self._get_etag(self.ref)
+                else:
+                    etag = None
 
-                    etag = info["ETag"]
+                local_file, new_etag = self.blocking_activity(_download_file, (self.__get_urlopener(), self.url, etag, td), activity_name)
 
-                    filename = info.get_filename(default_name)
-                    filename = os.path.basename(filename)
-                    local_file = os.path.join(td, filename)
-                    with open(local_file, "wb") as dest:
-                        shutil.copyfileobj(response, dest)
+                if local_file is None:
+                    return self.ref
 
                 # Make sure url-specific mirror dir exists.
                 if not os.path.isdir(self._mirror_dir):
@@ -233,8 +242,8 @@ class DownloadableFileSource(Source):
                 # In case the old file was corrupted somehow.
                 os.rename(local_file, self._get_mirror_file(sha256))
 
-                if etag:
-                    self._store_etag(sha256, etag)
+                if new_etag:
+                    self._store_etag(sha256, new_etag)
                 return sha256
 
         except urllib.error.HTTPError as e:
diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index deb105a..0ed6d7d 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -110,11 +110,12 @@ Class Reference
 """
 
 import itertools
+import multiprocessing
 import os
 import subprocess
 import sys
 from contextlib import contextmanager
-from typing import Generator, Optional, Tuple, TYPE_CHECKING
+from typing import Callable, Generator, Optional, Tuple, TypeVar, TYPE_CHECKING
 from weakref import WeakValueDictionary
 
 from . import utils
@@ -131,6 +132,14 @@ if TYPE_CHECKING:
     # pylint: enable=cyclic-import
 
 
+T1 = TypeVar("T1")
+T2 = TypeVar("T2")
+
+
+def _background_job_wrapper(queue: multiprocessing.Queue, target: Callable[[T1], T2], args: T1) -> None:
+    queue.put(target(*args))
+
+
 class Plugin:
     """Plugin()
 
@@ -212,6 +221,8 @@ class Plugin:
     # scheduling tasks.
     __TABLE = WeakValueDictionary()  # type: WeakValueDictionary[int, Plugin]
 
+    __multiprocessing_context = multiprocessing.get_context("spawn")
+
     def __init__(
         self,
         name: str,
@@ -503,6 +514,21 @@ class Plugin:
         ):
             yield
 
+    def blocking_activity(self, target: Callable[[T1], T2], args: T1, activity_name: str, *, detail: Optional[str] = None, silent_nested: bool = False) -> T2:
+        with self.__context.messenger.timed_activity(
+            activity_name, element_name=self._get_full_name(), detail=detail, silent_nested=silent_nested
+        ):
+            queue = self.__multiprocessing_context.Queue()
+
+            proc = self.__multiprocessing_context.Process(target=_background_job_wrapper, args=(queue, target, args))
+            proc.start()
+
+            result = queue.get()
+            proc.join()
+
+            return result
+
+
     def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int:
         """A wrapper for subprocess.call()
 


[buildstream] 11/13: WIP

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 5e118e25031ac132cfed6ad1fd951be5560d9a62
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Thu Jul 9 15:16:53 2020 +0100

    WIP
---
 setup.py                                 |  1 +
 src/buildstream/_scheduler/jobs/_job.pyx |  7 +++++++
 src/buildstream/_scheduler/jobs/job.py   | 16 ++++++++++++++--
 3 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/setup.py b/setup.py
index d89d5e6..91232ea 100755
--- a/setup.py
+++ b/setup.py
@@ -305,6 +305,7 @@ BUILD_EXTENSIONS = []
 register_cython_module("buildstream.node")
 register_cython_module("buildstream._loader._loader")
 register_cython_module("buildstream._loader.loadelement", dependencies=["buildstream.node"])
+register_cython_module("buildstream._scheduler.jobs._job")
 register_cython_module("buildstream._yaml", dependencies=["buildstream.node"])
 register_cython_module("buildstream._types")
 register_cython_module("buildstream._utils")
diff --git a/src/buildstream/_scheduler/jobs/_job.pyx b/src/buildstream/_scheduler/jobs/_job.pyx
new file mode 100644
index 0000000..a928305
--- /dev/null
+++ b/src/buildstream/_scheduler/jobs/_job.pyx
@@ -0,0 +1,7 @@
+from cpython.pystate cimport PyThreadState_SetAsyncExc
+from cpython.ref cimport PyObject
+
+
+def abort_thread(long id):
+    res = PyThreadState_SetAsyncExc(id, <PyObject*> BaseException)
+    assert res == 1
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index d461d0f..7d3519d 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -29,7 +29,9 @@ import traceback
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
 from ..._message import Message, MessageType
 from ...types import FastEnum
+from ._job import abort_thread
 
+import threading
 
 # Return code values shutdown of job handling child processes
 #
@@ -118,6 +120,7 @@ class Job:
         self._element = None  # The Element() passed to the Job() constructor, if applicable
 
         self._task = None  # The task that is run
+        self._child = None
 
     # set_name()
     #
@@ -136,7 +139,7 @@ class Job:
         self._tries += 1
 
         # FIXME: remove the parent/child separation, it's not needed anymore.
-        child_job = self.create_child_job(  # pylint: disable=assignment-from-no-return
+        self._child = self.create_child_job(  # pylint: disable=assignment-from-no-return
             self.action_name,
             self._messenger,
             self._scheduler.context.logdir,
@@ -151,7 +154,7 @@ class Job:
 
         async def execute():
             try:
-                ret_code, self._result = await loop.run_in_executor(None, child_job.child_action)
+                ret_code, self._result = await loop.run_in_executor(None, self._child.child_action)
             except asyncio.CancelledError:
                 ret_code = _ReturnCode.TERMINATED
             except Exception:  # pylint: disable=broad-except
@@ -172,6 +175,7 @@ class Job:
         # Terminate the process using multiprocessing API pathway
         if self._task:
             self._task.cancel()
+            self._child.terminate()
 
         self._terminated = True
 
@@ -425,6 +429,8 @@ class ChildJob:
     # Perform the action in the child process, this calls the action_cb.
     #
     def child_action(self):
+        self._thread_id = threading.current_thread().ident
+
         # Time, log and and run the action function
         #
         with self._messenger.timed_suspendable() as timeinfo, self._messenger.record_job(
@@ -480,3 +486,9 @@ class ChildJob:
                 # make sure we dont try to handle SIGTERM while the process
                 # is already busy in sys.exit()
                 return _ReturnCode.OK, result
+
+    def terminate(self):
+        if self._thread_id is None:
+            return
+
+        abort_thread(self._thread_id)


[buildstream] 07/13: _messenger.py: Add type annotations on the module to help refactors

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 8db1e974c5183412625d67d65fe40e6d7b464386
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Tue Jul 7 18:51:36 2020 +0000

    _messenger.py: Add type annotations on the module to help refactors
---
 src/buildstream/_messenger.py | 125 ++++++++++++++++++++++--------------------
 1 file changed, 66 insertions(+), 59 deletions(-)

diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 84bea6a..eb3bd51 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -1,5 +1,5 @@
 #
-#  Copyright (C) 2019 Bloomberg Finance LP
+#  Copyright (C) 2019-2020 Bloomberg Finance LP
 #
 #  This program is free software; you can redistribute it and/or
 #  modify it under the terms of the GNU Lesser General Public
@@ -21,10 +21,12 @@ import os
 import datetime
 import threading
 from contextlib import contextmanager
+from typing import Callable, Generator, Optional
 
 from . import _signals
 from ._exceptions import BstError
 from ._message import Message, MessageType
+from ._state import State, _Task
 
 
 _RENDER_INTERVAL = datetime.timedelta(seconds=1)
@@ -42,16 +44,21 @@ if "BST_TEST_SUITE" in os.environ:
 class _TimeData:
     __slots__ = ["start_time"]
 
-    def __init__(self, start_time):
+    def __init__(self, start_time: datetime.datetime) -> None:
         self.start_time = start_time
 
 
+class MessageHandlerCallback:
+    def __call__(self, message: Message, is_silenced: bool) -> None:
+        pass
+
+
 class Messenger:
-    def __init__(self):
-        self._state = None
-        self._next_render = None  # A Time object
-        self._active_simple_tasks = 0
-        self._render_status_cb = None
+    def __init__(self) -> None:
+        self._state: Optional[State] = None
+        self._next_render: Optional[datetime.datetime] = None  # A Time object
+        self._active_simple_tasks: int = 0
+        self._render_status_cb: Optional[Callable[[], None]] = None
 
         self._locals = threading.local()
         self._locals.message_handler = None
@@ -64,14 +71,10 @@ class Messenger:
     # Sets the handler for any status messages propagated through
     # the context.
     #
-    # The handler should have the signature:
-    #
-    #   def handler(
-    #      message: _message.Message,  # The message to send.
-    #      is_silenced: bool,          # Whether messages are currently being silenced.
-    #   ) -> None
+    # Args:
+    #   handler: The handler to call on message
     #
-    def set_message_handler(self, handler):
+    def set_message_handler(self, handler: MessageHandlerCallback) -> None:
         self._locals.message_handler = handler
 
     # set_state()
@@ -79,9 +82,9 @@ class Messenger:
     # Sets the State object within the Messenger
     #
     # Args:
-    #    state (State): The state to set
+    #    state: The state to set
     #
-    def set_state(self, state):
+    def set_state(self, state: State) -> None:
         self._state = state
 
     # set_render_status_cb()
@@ -89,20 +92,16 @@ class Messenger:
     # Sets the callback to use to render status
     #
     # Args:
-    #    callback (function): The Callback to be notified
+    #    callback: The Callback to be notified
     #
-    # Callback Args:
-    #    There are no arguments to the callback
-    #
-    def set_render_status_cb(self, callback):
+    def set_render_status_cb(self, callback: Callable[[], None]) -> None:
         self._render_status_cb = callback
 
     # _silent_messages():
     #
-    # Returns:
-    #    (bool): Whether messages are currently being silenced
+    # Returns: Whether messages are currently being silenced
     #
-    def _silent_messages(self):
+    def _silent_messages(self) -> bool:
         return self._locals.silence_scope_depth > 0
 
     # message():
@@ -113,7 +112,7 @@ class Messenger:
     # Args:
     #    message: A Message object
     #
-    def message(self, message):
+    def message(self, message: Message) -> None:
         # If we are recording messages, dump a copy into the open log file.
         self._record_message(message)
 
@@ -132,12 +131,11 @@ class Messenger:
     # _message.unconditional_messages will be silenced.
     #
     # Args:
-    #    actually_silence (bool): Whether to actually do the silencing, if
-    #                             False then this context manager does not
-    #                             affect anything.
+    #    actually_silence: Whether to actually do the silencing, if False then
+    #                      this context manager does not affect anything.
     #
     @contextmanager
-    def silence(self, *, actually_silence=True):
+    def silence(self, *, actually_silence: bool = True) -> Generator[None, None, None]:
         if not actually_silence:
             yield
             return
@@ -154,13 +152,20 @@ class Messenger:
     # Context manager for performing timed activities and logging those
     #
     # Args:
-    #    activity_name (str): The name of the activity
-    #    element_name (str): Optionally, the element full name of the plugin related to the message
-    #    detail (str): An optional detailed message, can be multiline output
-    #    silent_nested (bool): If True, all but _message.unconditional_messages are silenced
+    #    activity_name: The name of the activity
+    #    element_name: Optionally, the element full name of the plugin related to the message
+    #    detail: An optional detailed message, can be multiline output
+    #    silent_nested: If True, all but _message.unconditional_messages are silenced
     #
     @contextmanager
-    def timed_activity(self, activity_name, *, element_name=None, detail=None, silent_nested=False):
+    def timed_activity(
+        self,
+        activity_name: str,
+        *,
+        element_name: Optional[str] = None,
+        detail: Optional[str] = None,
+        silent_nested: bool = False
+    ) -> Generator[None, None, None]:
         with self.timed_suspendable() as timedata:
             try:
                 # Push activity depth for status messages
@@ -186,20 +191,26 @@ class Messenger:
     # Context manager for creating a task to report progress to.
     #
     # Args:
-    #    activity_name (str): The name of the activity
-    #    element_name (str): Optionally, the element full name of the plugin related to the message
-    #    full_name (str): Optionally, the distinguishing name of the activity, e.g. element name
-    #    silent_nested (bool): If True, all but _message.unconditional_messages are silenced
+    #    activity_name: The name of the activity
+    #    element_name: Optionally, the element full name of the plugin related to the message
+    #    full_name: Optionally, the distinguishing name of the activity, e.g. element name
+    #    silent_nested: If True, all but _message.unconditional_messages are silenced
     #
-    # Yields:
-    #    Task: A Task object that represents this activity, principally used to report progress
+    # Yields: A Task object that represents this activity, principally used to report progress
     #
     @contextmanager
-    def simple_task(self, activity_name, *, element_name=None, full_name=None, silent_nested=False):
+    def simple_task(
+        self,
+        activity_name: str,
+        *,
+        element_name: Optional[str] = None,
+        full_name: Optional[str] = None,
+        silent_nested: bool = False
+    ) -> Generator[Optional[_Task], None, None]:
         # Bypass use of State when none exists (e.g. tests)
         if not self._state:
             with self.timed_activity(activity_name, element_name=element_name, silent_nested=silent_nested):
-                yield
+                yield None
             return
 
         if not full_name:
@@ -255,17 +266,15 @@ class Messenger:
     # Messenger.get_log_filename() API.
     #
     # Args:
-    #     filename (str): A logging directory relative filename,
-    #                     the pid and .log extension will be automatically
-    #                     appended
+    #     filename: A logging directory relative filename, the pid and .log
+    #               extension will be automatically appended
     #
-    #     logdir (str)  : The path to the log file directory.
+    #     logdir : The path to the log file directory.
     #
-    # Yields:
-    #     (str): The fully qualified log filename
+    # Yields: The fully qualified log filename
     #
     @contextmanager
-    def recorded_messages(self, filename, logdir):
+    def recorded_messages(self, filename: str, logdir: str) -> Generator[str, None, None]:
         # We dont allow recursing in this context manager, and
         # we also do not allow it in the main process.
         assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None
@@ -308,10 +317,9 @@ class Messenger:
     # log file handle when the Messenger.recorded_messages() context
     # manager is active
     #
-    # Returns:
-    #     (file): The active logging file handle, or None
+    # Returns: The active logging file handle, or None
     #
-    def get_log_handle(self):
+    def get_log_handle(self) -> Optional[str]:
         return self._locals.log_handle
 
     # get_log_filename()
@@ -320,10 +328,9 @@ class Messenger:
     # log filename when the Messenger.recorded_messages() context
     # manager is active
     #
-    # Returns:
-    #     (str): The active logging filename, or None
+    # Returns: The active logging filename, or None
     #
-    def get_log_filename(self):
+    def get_log_filename(self) -> str:
         return self._locals.log_filename
 
     # timed_suspendable()
@@ -332,10 +339,10 @@ class Messenger:
     # adjust for clock drift caused by suspending
     #
     # Yields:
-    #    TimeData: An object that contains the time the activity started
+    #    An object that contains the time the activity started
     #
     @contextmanager
-    def timed_suspendable(self):
+    def timed_suspendable(self) -> Generator[_TimeData, None, None]:
         # Note: timedata needs to be in a namedtuple so that values can be
         # yielded that will change
         timedata = _TimeData(start_time=datetime.datetime.now())
@@ -361,7 +368,7 @@ class Messenger:
     # Args:
     #    message (Message): The message to record
     #
-    def _record_message(self, message):
+    def _record_message(self, message: Message) -> None:
 
         if self._locals.log_handle is None:
             return
@@ -408,7 +415,7 @@ class Messenger:
     # Calls the render status callback set in the messenger, but only if a
     # second has passed since it last rendered.
     #
-    def _render_status(self):
+    def _render_status(self) -> None:
         assert self._next_render
 
         # self._render_status_cb()


[buildstream] 02/13: _fixtures.py: Only get the normal number of threads at the start of session

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 065e5637a614128f8ea02ed4b5600045d75d8d3c
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Fri Jul 3 12:47:14 2020 +0000

    _fixtures.py: Only get the normal number of threads at the start of session
    
    This ensures that we also have our tests correctly shutting down
    background threads and not interferring with each other
---
 src/buildstream/testing/_fixtures.py             | 17 ++++++++++++-----
 src/buildstream/testing/_sourcetests/conftest.py |  2 +-
 tests/conftest.py                                |  6 +++++-
 3 files changed, 18 insertions(+), 7 deletions(-)

diff --git a/src/buildstream/testing/_fixtures.py b/src/buildstream/testing/_fixtures.py
index 5da51bb..5d1c1d2 100644
--- a/src/buildstream/testing/_fixtures.py
+++ b/src/buildstream/testing/_fixtures.py
@@ -14,19 +14,26 @@
 #  You should have received a copy of the GNU Lesser General Public
 #  License along with this library. If not, see <http://www.gnu.org/licenses/>.
 
+# pylint: disable=redefined-outer-name
+
 import psutil
 import pytest
 
-from buildstream import node, utils
+from buildstream import node, utils, DownloadableFileSource
 
-# Catch tests that don't shut down background threads, which could then lead
-# to other tests hanging when BuildStream uses fork().
-@pytest.fixture(autouse=True)
-def thread_check():
+
+@pytest.fixture(autouse=True, scope="session")
+def default_thread_number():
     # xdist/execnet has its own helper thread.
     # Ignore that for `utils._is_single_threaded` checks.
     utils._INITIAL_NUM_THREADS_IN_MAIN_PROCESS = psutil.Process().num_threads()
 
+
+# Catch tests that don't shut down background threads, which could then lead
+# to other tests hanging when BuildStream uses fork().
+@pytest.fixture(autouse=True)
+def thread_check(default_thread_number):
+    assert utils._is_single_threaded()
     yield
     assert utils._is_single_threaded()
 
diff --git a/src/buildstream/testing/_sourcetests/conftest.py b/src/buildstream/testing/_sourcetests/conftest.py
index 64dd404..6790712 100644
--- a/src/buildstream/testing/_sourcetests/conftest.py
+++ b/src/buildstream/testing/_sourcetests/conftest.py
@@ -14,4 +14,4 @@
 #  You should have received a copy of the GNU Lesser General Public
 #  License along with this library. If not, see <http://www.gnu.org/licenses/>.
 
-from .._fixtures import reset_global_node_state, thread_check  # pylint: disable=unused-import
+from .._fixtures import reset_global_node_state, default_thread_number, thread_check  # pylint: disable=unused-import
diff --git a/tests/conftest.py b/tests/conftest.py
index 28e120d..4761131 100755
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -25,7 +25,11 @@ import pkg_resources
 import pytest
 
 from buildstream.testing import register_repo_kind, sourcetests_collection_hook
-from buildstream.testing._fixtures import reset_global_node_state, thread_check  # pylint: disable=unused-import
+from buildstream.testing._fixtures import (  # pylint: disable=unused-import
+    default_thread_number,
+    reset_global_node_state,
+    thread_check,
+)
 from buildstream.testing.integration import integration_cache  # pylint: disable=unused-import