You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:45:00 UTC

[buildstream] 02/16: Add in dual queue implementation for subprocess build

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 889a6ea39390214d62294f5eee5e37b1a69f8c43
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Sep 16 12:20:06 2019 +0100

    Add in dual queue implementation for subprocess build
---
 src/buildstream/_scheduler/scheduler.py |  42 ++++++++---
 src/buildstream/_stream.py              | 123 ++++++++++++++++++++++++++------
 src/buildstream/utils.py                |   7 +-
 3 files changed, 142 insertions(+), 30 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index c85c141..5a3da69 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -24,6 +24,7 @@ import asyncio
 from itertools import chain
 import signal
 import datetime
+import queue
 
 # Local imports
 from .resources import Resources
@@ -64,6 +65,7 @@ class NotificationType(FastEnum):
     RETRY = "retry"
     MESSAGE = "message"
     TASK_ERROR = "task_error"
+    EXCEPTION = "exception"
 
 
 # Notification()
@@ -85,7 +87,9 @@ class Notification():
                  time=None,
                  element=None,
                  message=None,
-                 task_error=None):
+                 task_error=None,
+                 for_scheduler=False,
+                 exception=None):
         self.notification_type = notification_type
         self.full_name = full_name
         self.job_action = job_action
@@ -94,6 +98,7 @@ class Notification():
         self.element = element
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
+        self.exception = exception
 
 
 # Scheduler()
@@ -119,7 +124,7 @@ class Notification():
 class Scheduler():
 
     def __init__(self, context,
-                 start_time, state, notification_queue, notifier):
+                 start_time, state, notifier):
 
         #
         # Public members
@@ -143,8 +148,10 @@ class Scheduler():
         self._state = state
         self._casd_process = None             # handle to the casd process for monitoring purpose
 
-        # Bidirectional queue to send notifications back to the Scheduler's owner
-        self._notification_queue = notification_queue
+        # Bidirectional pipe to send notifications back to the Scheduler's owner
+        self._notify_front = None
+        self._notify_back = None
+        # Notifier callback to use if not running in a subprocess
         self._notifier = notifier
 
         self.resources = Resources(context.sched_builders,
@@ -189,6 +196,10 @@ class Scheduler():
         self._casd_process = casd_process
         _watcher = asyncio.get_child_watcher()
         _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+        
+        # Add notification handler
+        if self._notify_back:
+            self.loop.call_later(0.01, self._loop)
 
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
@@ -580,12 +591,13 @@ class Scheduler():
         queue.enqueue([element])
 
     def _notify(self, notification):
-        # Scheduler to Stream notifcations on right side
-        self._notification_queue.append(notification)
-        self._notifier()
+        # Check if we need to call the notifier callback
+        if self._notify_front:
+            self._notify_front.put(notification)
+        else:
+            self._notifier(notification)
 
-    def _stream_notification_handler(self):
-        notification = self._notification_queue.popleft()
+    def _stream_notification_handler(self, notification):
         if notification.notification_type == NotificationType.TERMINATE:
             self.terminate_jobs()
         elif notification.notification_type == NotificationType.QUIT:
@@ -601,6 +613,18 @@ class Scheduler():
             # as we don't want to pickle exceptions between processes
             raise ValueError("Unrecognised notification type received")
 
+    def _loop(self):
+        assert self._notify_back
+        # Check for and process new messages
+        while True:
+            try:
+                notification = self._notify_back.get_nowait()
+                self._stream_notification_handler(notification)
+            except queue.Empty:
+                notification = None
+                break
+        self.loop.call_later(0.01, self._loop)
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 63c09da..7cb3515 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,6 +19,9 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 #        Tristan Maat <tr...@codethink.co.uk>
 
+import asyncio
+import functools
+import multiprocessing as mp
 import os
 import sys
 import stat
@@ -26,9 +29,9 @@ import shlex
 import shutil
 import tarfile
 import tempfile
+import queue
 from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
-from collections import deque
 
 from ._artifactelement import verify_artifact_ref, ArtifactElement
 from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
@@ -79,13 +82,15 @@ class Stream():
         self._project = None
         self._pipeline = None
         self._state = State(session_start)  # Owned by Stream, used by Core to set state
-        self._notification_queue = deque()
+        #self._notification_pipe_front, self._notification_pipe_back = mp.Pipe()
+        self._subprocess = None
         self._starttime = session_start  # Synchronised with Scheduler's relative start time
 
         context.messenger.set_state(self._state)
 
-        self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
-                                    self._scheduler_notification_handler)
+        # Scheduler may use callback for notification depending on whether it's subprocessed
+        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler)
+
         self._first_non_track_queue = None
         self._session_start_callback = session_start_callback
         self._ticker_callback = ticker_callback
@@ -94,6 +99,8 @@ class Stream():
         self._scheduler_running = False
         self._scheduler_terminated = False
         self._scheduler_suspended = False
+        self._notify_front = None
+        self._notify_back = None
 
     # init()
     #
@@ -104,11 +111,69 @@ class Stream():
         self._artifacts = self._context.artifactcache
         self._sourcecache = self._context.sourcecache
 
+    @staticmethod
+    def _subprocess_main(func, notify, *args, **kwargs):
+        # Set main process
+        utils._reset_main_pid()
+        try:
+            func(*args, **kwargs)
+        except Exception as e:
+            notify.put(Notification(NotificationType.EXCEPTION, exception=e))
+
+    def run_in_subprocess(self, func, *args, **kwargs):
+        print("Args: {}".format([*args]))
+        print("Kwargs: {}".format(list(kwargs.items())))
+        assert not self._subprocess
+
+        mp_context = mp.get_context(method='fork')
+        process_name = "stream-{}".format(func.__name__)
+        
+        self._notify_front = mp.Queue()
+        self._notify_back = mp.Queue()
+        # Tell the scheduler to not use the notifier callback
+        self._scheduler._notify_front = self._notify_front
+        self._scheduler._notify_back = self._notify_back
+        
+        args = list(args)
+        args.insert(0, self._notify_front)
+        args.insert(0, func)
+        print("launching subprocess:", process_name)
+
+        self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
+                                              kwargs=kwargs, name=process_name)
+
+        self._subprocess.start()
+
+        # TODO connect signal handlers with asyncio
+        while self._subprocess.exitcode is None:
+            # check every given time interval on subprocess state
+            self._subprocess.join(0.01)
+            # if no exit code, go back to checking the message queue
+            self._loop()
+        print("Stopping loop...")
+
+        # Set main process back
+        utils._reset_main_pid()
+
+        # Ensure no more notifcations to process
+        try:
+            while True:
+                notification = self._notify_front.get_nowait()
+                self._scheduler_notification_handler(notification)
+        except queue.Empty:
+            print("Finished processing notifications")
+            pass
+
     # cleanup()
     #
     # Cleans up application state
     #
     def cleanup(self):
+        # Close the notification queue
+        for q in [self._notify_back, self._notify_front]:
+            if q is not None:
+                q.close()
+        #self._notification_queue.cancel_join_thread()
         if self._project:
             self._project.cleanup()
 
@@ -233,6 +298,9 @@ class Stream():
         return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command,
                               usebuildtree=buildtree)
 
+    def build(self, *args, **kwargs):
+        self.run_in_subprocess(self._build, *args, **kwargs)
+
     # build()
     #
     # Builds (assembles) elements in the pipeline.
@@ -249,13 +317,13 @@ class Stream():
     # If `remote` specified as None, then regular configuration will be used
     # to determine where to push artifacts to.
     #
-    def build(self, targets, *,
-              selection=PipelineSelection.PLAN,
-              track_targets=None,
-              track_except=None,
-              track_cross_junctions=False,
-              ignore_junction_targets=False,
-              remote=None):
+    def _build(self, targets, *,
+               selection=PipelineSelection.PLAN,
+               track_targets=None,
+               track_except=None,
+               track_cross_junctions=False,
+               ignore_junction_targets=False,
+               remote=None):
 
         use_config = True
         if remote:
@@ -1657,11 +1725,7 @@ class Stream():
 
         return element_targets, artifact_refs
 
-    def _scheduler_notification_handler(self):
-        # Check the queue is there
-        assert self._notification_queue
-        notification = self._notification_queue.pop()
-
+    def _scheduler_notification_handler(self, notification):
         if notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT:
@@ -1671,6 +1735,7 @@ class Stream():
         elif notification.notification_type == NotificationType.JOB_START:
             self._state.add_task(notification.job_action, notification.full_name, notification.time)
         elif notification.notification_type == NotificationType.JOB_COMPLETE:
+            # State between scheduler & stream is different if ran in subprocces
             self._state.remove_task(notification.job_action, notification.full_name)
             if notification.job_status == JobStatus.FAIL:
                 self._state.fail_task(notification.job_action, notification.full_name,
@@ -1685,14 +1750,32 @@ class Stream():
             self._scheduler_suspended = not self._scheduler_suspended
         elif notification.notification_type == NotificationType.TASK_ERROR:
             set_last_task_error(*notification.task_error)
+        elif notification.notification_type == NotificationType.EXCEPTION:
+            raise notification.exception
         else:
             raise StreamError("Unrecognised notification type received")
 
     def _notify(self, notification):
-        # Stream to scheduler notifcations on left side
-        self._notification_queue.appendleft(notification)
-        self._notifier()
-
+        # Set that the notifcation is for the scheduler
+        #notification.for_scheduler = True
+        if self._notify_back:
+            self._notify_back.put(notification)
+        else:
+            self._scheduler._stream_notification_handler(notification)
+
+    # The code to be run by the Stream's event loop while delegating
+    # work to a subprocess with the @subprocessed decorator
+    def _loop(self):
+        assert self._notify_front
+        # Check for and process new messages
+        while True:
+            try:
+                notification = self._notify_front.get_nowait()
+                self._scheduler_notification_handler(notification)
+            except queue.Empty:
+                notification = None
+                break
+    
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index de7c14b..75978ef 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -739,6 +739,11 @@ def _is_main_process():
     return os.getpid() == _MAIN_PID
 
 
+def _reset_main_pid():
+    global _MAIN_PID
+    _MAIN_PID = os.getpid()
+
+
 # Recursively remove directories, ignoring file permissions as much as
 # possible.
 def _force_rmtree(rootpath, **kwargs):
@@ -1429,7 +1434,7 @@ def _is_single_threaded():
     # gRPC threads are not joined when shut down. Wait for them to exit.
     wait = 0.1
     for _ in range(0, int(_AWAIT_THREADS_TIMEOUT_SECONDS / wait)):
-        if process.num_threads() == expected_num_threads:
+        if process.num_threads() == expected_num_threads or (expected_num_threads + 1):
             return True
         time.sleep(wait)
     return False