You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:46:52 UTC

[buildstream] 02/18: Add in dual queue implementation for subprocess build.

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/subrebase
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 519737f25cc40814819cd1dac4ba072ae34f37ce
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Sep 16 12:20:06 2019 +0100

    Add in dual queue implementation for subprocess build.
    
    This also adapts utils.py handling of PID to account for
    the stream multiprocessing, and how callers assert that
    they're the 'main_process' or in a job.
---
 doc/source/hacking/coding_guidelines.rst  |   2 +-
 src/buildstream/_messenger.py             |   2 +-
 src/buildstream/_scheduler/scheduler.py   |  42 +++++++++---
 src/buildstream/_stream.py                | 104 ++++++++++++++++++++++++++----
 src/buildstream/_workspaces.py            |   2 +-
 src/buildstream/element.py                |   6 +-
 src/buildstream/sandbox/_sandboxremote.py |   2 +-
 src/buildstream/utils.py                  |  31 ++++++---
 8 files changed, 152 insertions(+), 39 deletions(-)

diff --git a/doc/source/hacking/coding_guidelines.rst b/doc/source/hacking/coding_guidelines.rst
index ecab241..10f76e9 100644
--- a/doc/source/hacking/coding_guidelines.rst
+++ b/doc/source/hacking/coding_guidelines.rst
@@ -609,7 +609,7 @@ In these cases, do **not** raise any of the ``BstError`` class exceptions.
 
 Instead, use the ``assert`` statement, e.g.::
 
-  assert utils._is_main_process(), \
+  assert not utils._is_job_process(), \
       "Attempted to save workspace configuration from child process"
 
 This will result in a ``BUG`` message with the stack trace included being
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 03b2833..9e2269f 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -270,7 +270,7 @@ class Messenger:
         # we also do not allow it in the main process.
         assert self._log_handle is None
         assert self._log_filename is None
-        assert not utils._is_main_process()
+        assert utils._is_job_process()
 
         # Create the fully qualified logfile in the log directory,
         # appending the pid and .log extension at the end.
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 4e034c5..402ce1d 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -24,6 +24,7 @@ import asyncio
 from itertools import chain
 import signal
 import datetime
+import queue
 
 # Local imports
 from .resources import Resources
@@ -67,6 +68,7 @@ class NotificationType(FastEnum):
     RETRY = "retry"
     MESSAGE = "message"
     TASK_ERROR = "task_error"
+    EXCEPTION = "exception"
 
 
 # Notification()
@@ -88,7 +90,9 @@ class Notification:
         time=None,
         element=None,
         message=None,
-        task_error=None
+        task_error=None,
+        for_scheduler=None,
+        exception=None
     ):
         self.notification_type = notification_type
         self.full_name = full_name
@@ -98,6 +102,7 @@ class Notification:
         self.element = element
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
+        self.exception = exception
 
 
 # Scheduler()
@@ -121,7 +126,7 @@ class Notification:
 #    ticker_callback: A callback call once per second
 #
 class Scheduler:
-    def __init__(self, context, start_time, state, notification_queue, notifier):
+    def __init__(self, context, start_time, state, notifier):
 
         #
         # Public members
@@ -145,8 +150,10 @@ class Scheduler:
         self._state = state
         self._casd_process = None  # handle to the casd process for monitoring purpose
 
-        # Bidirectional queue to send notifications back to the Scheduler's owner
-        self._notification_queue = notification_queue
+        # Bidirectional pipe to send notifications back to the Scheduler's owner
+        self._notify_front = None
+        self._notify_back = None
+        # Notifier callback to use if not running in a subprocess
         self._notifier = notifier
 
         self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
@@ -190,6 +197,10 @@ class Scheduler:
         _watcher = asyncio.get_child_watcher()
         _watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure)
 
+        # Add notification handler
+        if self._notify_back:
+            self.loop.call_later(0.01, self._loop)
+
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
             # Run the queues
@@ -573,12 +584,13 @@ class Scheduler:
         queue.enqueue([element])
 
     def _notify(self, notification):
-        # Scheduler to Stream notifcations on right side
-        self._notification_queue.append(notification)
-        self._notifier()
+        # Check if we need to call the notifier callback
+        if self._notify_front:
+            self._notify_front.put(notification)
+        else:
+            self._notifier(notification)
 
-    def _stream_notification_handler(self):
-        notification = self._notification_queue.popleft()
+    def _stream_notification_handler(self, notification):
         if notification.notification_type == NotificationType.TERMINATE:
             self.terminate_jobs()
         elif notification.notification_type == NotificationType.QUIT:
@@ -594,6 +606,18 @@ class Scheduler:
             # as we don't want to pickle exceptions between processes
             raise ValueError("Unrecognised notification type received")
 
+    def _loop(self):
+        assert self._notify_back
+        # Check for and process new messages
+        while True:
+            try:
+                notification = self._notify_back.get_nowait()
+                self._stream_notification_handler(notification)
+            except queue.Empty:
+                notification = None
+                break
+        self.loop.call_later(0.01, self._loop)
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 5bb7de3..e0c3383 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,6 +19,9 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 #        Tristan Maat <tr...@codethink.co.uk>
 
+import asyncio
+import functools
+import multiprocessing as mp
 import os
 import sys
 import stat
@@ -26,9 +29,9 @@ import shlex
 import shutil
 import tarfile
 import tempfile
+import queue
 from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
-from collections import deque
 from typing import List, Tuple
 
 from ._artifactelement import verify_artifact_ref, ArtifactElement
@@ -90,14 +93,13 @@ class Stream:
         self._project = None
         self._pipeline = None
         self._state = State(session_start)  # Owned by Stream, used by Core to set state
-        self._notification_queue = deque()
+        # self._notification_pipe_front, self._notification_pipe_back = mp.Pipe()
+        self._subprocess = None
         self._starttime = session_start  # Synchronised with Scheduler's relative start time
 
         context.messenger.set_state(self._state)
 
-        self._scheduler = Scheduler(
-            context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler
-        )
+        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler)
         self._session_start_callback = session_start_callback
         self._ticker_callback = ticker_callback
         self._interrupt_callback = interrupt_callback
@@ -105,6 +107,8 @@ class Stream:
         self._scheduler_running = False
         self._scheduler_terminated = False
         self._scheduler_suspended = False
+        self._notify_front = None
+        self._notify_back = None
 
     # init()
     #
@@ -115,11 +119,65 @@ class Stream:
         self._artifacts = self._context.artifactcache
         self._sourcecache = self._context.sourcecache
 
+    @staticmethod
+    def _subprocess_main(func, notify, *args, **kwargs):
+        # Set main process
+        utils._set_stream_pid()
+
+        try:
+            func(*args, **kwargs)
+        except Exception as e:
+            notify.put(Notification(NotificationType.EXCEPTION, exception=e))
+
+    def run_in_subprocess(self, func, *args, **kwargs):
+        assert not self._subprocess
+
+        mp_context = mp.get_context(method="fork")
+        process_name = "stream-{}".format(func.__name__)
+
+        self._notify_front = mp.Queue()
+        self._notify_back = mp.Queue()
+        # Tell the scheduler to not use the notifier callback
+        self._scheduler._notify_front = self._notify_front
+        self._scheduler._notify_back = self._notify_back
+
+        args = list(args)
+        args.insert(0, self._notify_front)
+        args.insert(0, func)
+
+        self._subprocess = mp_context.Process(
+            target=Stream._subprocess_main, args=args, kwargs=kwargs, name=process_name
+        )
+
+        self._subprocess.start()
+
+        # TODO connect signal handlers with asyncio
+        while self._subprocess.exitcode is None:
+            # check every given time interval on subprocess state
+            self._subprocess.join(0.01)
+            # if no exit code, go back to checking the message queue
+            self._loop()
+        print("Stopping loop...")
+
+        # Ensure no more notifcations to process
+        try:
+            while True:
+                notification = self._notify_front.get_nowait()
+                self._scheduler_notification_handler(notification)
+        except queue.Empty:
+            print("Finished processing notifications")
+            pass
+
     # cleanup()
     #
     # Cleans up application state
     #
     def cleanup(self):
+        # Close the notification queue
+        for q in [self._notify_back, self._notify_front]:
+            if q is not None:
+                q.close()
+        # self._notification_queue.cancel_join_thread()
         if self._project:
             self._project.cleanup()
 
@@ -261,6 +319,9 @@ class Stream:
             scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree
         )
 
+    def build(self, *args, **kwargs):
+        self.run_in_subprocess(self._build, *args, **kwargs)
+
     # build()
     #
     # Builds (assembles) elements in the pipeline.
@@ -274,7 +335,7 @@ class Stream:
     # If `remote` specified as None, then regular configuration will be used
     # to determine where to push artifacts to.
     #
-    def build(self, targets, *, selection=PipelineSelection.PLAN, ignore_junction_targets=False, remote=None):
+    def _build(self, targets, *, selection=PipelineSelection.PLAN, ignore_junction_targets=False, remote=None):
 
         use_config = True
         if remote:
@@ -1624,11 +1685,7 @@ class Stream:
 
         return element_targets, artifact_refs
 
-    def _scheduler_notification_handler(self):
-        # Check the queue is there
-        assert self._notification_queue
-        notification = self._notification_queue.pop()
-
+    def _scheduler_notification_handler(self, notification):
         if notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT:
@@ -1638,6 +1695,7 @@ class Stream:
         elif notification.notification_type == NotificationType.JOB_START:
             self._state.add_task(notification.job_action, notification.full_name, notification.time)
         elif notification.notification_type == NotificationType.JOB_COMPLETE:
+            # State between scheduler & stream is different if run in a subprocces
             self._state.remove_task(notification.job_action, notification.full_name)
             if notification.job_status == JobStatus.FAIL:
                 self._state.fail_task(notification.job_action, notification.full_name, notification.element)
@@ -1651,13 +1709,31 @@ class Stream:
             self._scheduler_suspended = not self._scheduler_suspended
         elif notification.notification_type == NotificationType.TASK_ERROR:
             set_last_task_error(*notification.task_error)
+        elif notification.notification_type == NotificationType.EXCEPTION:
+            raise notification.exception
         else:
             raise StreamError("Unrecognised notification type received")
 
     def _notify(self, notification):
-        # Stream to scheduler notifcations on left side
-        self._notification_queue.appendleft(notification)
-        self._notifier()
+        # Set that the notifcation is for the scheduler
+        # notification.for_scheduler = True
+        if self._notify_back:
+            self._notify_back.put(notification)
+        else:
+            self._scheduler._stream_notification_handler(notification)
+
+    # The code to be run by the Stream's event loop while delegating
+    # work to a subprocess with the @subprocessed decorator
+    def _loop(self):
+        assert self._notify_front
+        # Check for and process new messages
+        while True:
+            try:
+                notification = self._notify_front.get_nowait()
+                self._scheduler_notification_handler(notification)
+            except queue.Empty:
+                notification = None
+                break
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
diff --git a/src/buildstream/_workspaces.py b/src/buildstream/_workspaces.py
index 3d50fd9..f9636f8 100644
--- a/src/buildstream/_workspaces.py
+++ b/src/buildstream/_workspaces.py
@@ -518,7 +518,7 @@ class Workspaces:
     # create_workspace permanent
     #
     def save_config(self):
-        assert utils._is_main_process()
+        assert not utils._is_job_process()
 
         config = {
             "format-version": BST_WORKSPACE_FORMAT_VERSION,
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index ffce257..6918c9c 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -769,7 +769,7 @@ class Element(Plugin):
                 self.info("Resetting workspace state, last successful build is no longer in the cache")
 
                 # In case we are staging in the main process
-                if utils._is_main_process():
+                if not utils._is_job_process():
                     context.get_workspaces().save_config()
 
         for dep in self.dependencies(scope):
@@ -794,7 +794,7 @@ class Element(Plugin):
 
                     # In case we are running `bst shell`, this happens in the
                     # main process and we need to update the workspace config
-                    if utils._is_main_process():
+                    if not utils._is_job_process():
                         context.get_workspaces().save_config()
 
             result = dep.stage_artifact(
@@ -1588,7 +1588,7 @@ class Element(Plugin):
         self._update_ready_for_runtime_and_cached()
 
         if self._get_workspace() and self._cached_success():
-            assert utils._is_main_process(), "Attempted to save workspace configuration from child process"
+            assert not utils._is_job_process(), "Attempted to save workspace configuration from child process"
             #
             # Note that this block can only happen in the
             # main process, since `self._cached_success()` cannot
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index d4ffd64..c07ab8c 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -59,7 +59,7 @@ class SandboxRemote(SandboxREAPI):
             return
 
         # gRPC doesn't support fork without exec, which is used in the main process.
-        assert not utils._is_main_process()
+        assert utils._is_job_process()
 
         self.storage_url = config.storage_service["url"]
         self.exec_url = config.exec_service["url"]
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index 7f7bf67..3902f7d 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -58,6 +58,9 @@ _URI_SCHEMES = ["http", "https", "ftp", "file", "git", "sftp", "ssh"]
 # Main process pid
 _MAIN_PID = os.getpid()
 
+# This is different to _MAIN_PID if running a subprocessed stream entry point
+_STREAM_PID = _MAIN_PID
+
 # The number of threads in the main process at startup.
 # This is 1 except for certain test environments (xdist/execnet).
 _INITIAL_NUM_THREADS_IN_MAIN_PROCESS = 1
@@ -770,13 +773,18 @@ def _pretty_size(size, dec_places=0):
     return "{size:g}{unit}".format(size=round(psize, dec_places), unit=unit)
 
 
-# _is_main_process()
+# _is_job_process()
 #
-# Return whether we are in the main process or not.
+# Return whether we are in a job process.
 #
-def _is_main_process():
-    assert _MAIN_PID is not None
-    return os.getpid() == _MAIN_PID
+def _is_job_process():
+    assert _STREAM_PID is not None
+    return os.getpid() != _STREAM_PID
+
+
+def _set_stream_pid() -> None:
+    global _STREAM_PID  # pylint: disable=global-statement
+    _STREAM_PID = os.getpid()
 
 
 # Recursively remove directories, ignoring file permissions as much as
@@ -1479,10 +1487,15 @@ def _is_single_threaded():
     # Use psutil as threading.active_count() doesn't include gRPC threads.
     process = psutil.Process()
 
-    if process.pid == _MAIN_PID:
-        expected_num_threads = _INITIAL_NUM_THREADS_IN_MAIN_PROCESS
-    else:
-        expected_num_threads = 1
+    expected_num_threads = 1
+
+    if process.pid == _STREAM_PID:
+        if _STREAM_PID != _MAIN_PID:
+            # multiprocessing.Queue() has a background thread for object pickling,
+            # see https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues
+            expected_num_threads += 1
+        else:
+            expected_num_threads = _INITIAL_NUM_THREADS_IN_MAIN_PROCESS
 
     # gRPC threads are not joined when shut down. Wait for them to exit.
     wait = 0.1