You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:20:48 UTC

[buildstream] 11/19: basic async in stream

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit b39e1170c102d36601ea853837e451841f091942
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Oct 11 10:45:58 2019 +0100

    basic async in stream
---
 src/buildstream/_context.py             |   3 +
 src/buildstream/_exceptions.py          |  10 +++
 src/buildstream/_frontend/app.py        |  57 ++++++------
 src/buildstream/_frontend/status.py     |   1 +
 src/buildstream/_scheduler/scheduler.py |  54 ++++++++----
 src/buildstream/_stream.py              | 148 ++++++++++++++++++++++++--------
 6 files changed, 196 insertions(+), 77 deletions(-)

diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index f426f4b..d7a4437 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -174,6 +174,9 @@ class Context:
         self._workspace_project_cache = WorkspaceProjectCache()
         self._cascache = None
 
+        # An exception caught from subprocessing, used to handle run exceptions in App
+        self._subprocess_exception = None
+
     # __enter__()
     #
     # Called when entering the with-statement context.
diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index 85fcf61..69acc69 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -45,6 +45,16 @@ def get_last_exception():
     return le
 
 
+# set_last_exception()
+#
+# Sets the last exception from the main process, used if Stream is running a subprocess
+#
+def set_last_exception(exception):
+    if "BST_TEST_SUITE" in os.environ:
+        global _last_exception
+        _last_exception = exception
+
+
 # get_last_task_error()
 #
 # Fetches the last exception from a task
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 471901f..5fe38ce 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -300,39 +300,28 @@ class App:
             try:
                 yield
             except BstError as e:
+                self._handle_run_exception(e, session_name)
 
-                # Print a nice summary if this is a session
-                if session_name:
-                    elapsed = self.stream.elapsed_time
-
-                    if isinstance(e, StreamError) and e.terminated:  # pylint: disable=no-member
-                        self._message(MessageType.WARN, session_name + " Terminated", elapsed=elapsed)
-                    else:
-                        self._message(MessageType.FAIL, session_name, elapsed=elapsed)
-
-                        # Notify session failure
-                        self._notify("{} failed".format(session_name), e)
-
-                    if self._started:
-                        self._print_summary()
-
-                # Exit with the error
-                self._error_exit(e)
             except RecursionError:
                 click.echo(
                     "RecursionError: Dependency depth is too large. Maximum recursion depth exceeded.", err=True
                 )
                 sys.exit(-1)
 
-            else:
+            if self.context._subprocess_exception:
+                # If a handled exception was thrown in a Stream subprocessed asyncio method, handle it
+                if isinstance(self.context._subprocess_exception, BstError):
+                    self._handle_run_exception(self.context._subprocess_exception, session_name)
+                else:
+                    # We don't gracefully handle non BstError() Excpetions
+                    raise self.context._subprocess_exception  # pylint: disable=raising-bad-type
+            elif session_name:
                 # No exceptions occurred, print session time and summary
-                if session_name:
-                    self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
-                    if self._started:
-                        self._print_summary()
-
-                    # Notify session success
-                    self._notify("{} succeeded".format(session_name), "")
+                self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+                if self._started:
+                    self._print_summary()
+                # Notify session success
+                self._notify("{} succeeded".format(session_name), "")
 
     # init_project()
     #
@@ -972,6 +961,24 @@ class App:
 
         return (project_name, format_version, element_path)
 
+    def _handle_run_exception(self, exception, session_name):
+        # Print a nice summary if this is a session
+        if session_name:
+            elapsed = self.stream.elapsed_time
+
+            if isinstance(exception, StreamError) and exception.terminated:  # pylint: disable=no-member
+                self._message(MessageType.WARN, session_name + " Terminated", elapsed=elapsed)
+            else:
+                self._message(MessageType.FAIL, session_name, elapsed=elapsed)
+
+                # Notify session failure
+                self._notify("{} failed".format(session_name), exception)
+
+            if self._started:
+                self._print_summary()
+
+        self._error_exit(exception)
+
 
 #
 # Return a value processor for partial choice matching.
diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index d3132fe..f16e7d1 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -357,6 +357,7 @@ class _StatusHeader:
         #
         #  ========= 00:00:00 project-name (143/387) =========
         #
+
         session = self._stream.len_session_elements
         total = self._stream.len_total_elements
 
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 87853f0..fbe2599 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -72,6 +72,8 @@ class NotificationType(FastEnum):
     START = "start"
     TASK_GROUPS = "task_groups"
     ELEMENT_TOTALS = "element_totals"
+    FINISH = "finish"
+    SIGTSTP = "sigstp"
 
 
 # Notification()
@@ -184,6 +186,9 @@ class Scheduler:
         # Hold on to the queues to process
         self.queues = queues
 
+        # Check if we're subprocessed
+        subprocessed = bool(self._notify_front_queue)
+
         # Ensure that we have a fresh new event loop, in case we want
         # to run another test in this thread.
         self.loop = asyncio.new_event_loop()
@@ -198,10 +203,11 @@ class Scheduler:
         # Handle unix signals while running
         self._connect_signals()
 
-        # Watch casd while running to ensure it doesn't die
-        self._casd_process = casd_process
-        _watcher = asyncio.get_child_watcher()
-        _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+        # If we're not in a subprocess, watch casd while running to ensure it doesn't die
+        if not subprocessed:
+            self._casd_process = casd_process
+            _watcher = asyncio.get_child_watcher()
+            _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
 
         # Add notification listener if in subprocess
         self._start_listening()
@@ -215,9 +221,11 @@ class Scheduler:
             self._stop_listening()
             self.loop.close()
 
-        # Stop watching casd
-        _watcher.remove_child_handler(casd_process.pid)
-        self._casd_process = None
+        # Stop watching casd if not subprocessed
+        if self._casd_process:
+            _watcher.remove_child_handler(casd_process.pid)
+            _watcher.close()
+            self._casd_process = None
 
         # Stop handling unix signals
         self._disconnect_signals()
@@ -236,7 +244,7 @@ class Scheduler:
             status = SchedStatus.SUCCESS
 
         # Send the state taskgroups if we're running under the subprocess
-        if self._notify_front_queue:
+        if subprocessed:
             # Don't pickle state
             for group in self._state.task_groups.values():
                 group._state = None
@@ -529,6 +537,8 @@ class Scheduler:
         if self.terminated:
             return
 
+        # This event handler is only set when not running in a subprocess, scheduler
+        # to handle keyboard interrupt
         notification = Notification(NotificationType.INTERRUPT)
         self._notify_front(notification)
 
@@ -558,17 +568,29 @@ class Scheduler:
 
     # _connect_signals():
     #
-    # Connects our signal handler event callbacks to the mainloop
+    # Connects our signal handler event callbacks to the mainloop. Signals
+    # only need to be connected if scheduler running in the 'main' process
     #
     def _connect_signals(self):
-        self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event)
-        self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event)
-        self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event)
+        if not self._notify_front_queue:
+            self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event)
+            self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event)
+            self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event)
 
+    # _disconnect_signals():
+    #
+    # Disconnects our signal handler event callbacks from the mainloop. Signals
+    # only need to be disconnected if scheduler running in the 'main' process
+    #
     def _disconnect_signals(self):
-        self.loop.remove_signal_handler(signal.SIGINT)
-        self.loop.remove_signal_handler(signal.SIGTSTP)
-        self.loop.remove_signal_handler(signal.SIGTERM)
+        if not self._notify_front_queue:
+            self.loop.remove_signal_handler(signal.SIGINT)
+            self.loop.remove_signal_handler(signal.SIGTSTP)
+            self.loop.remove_signal_handler(signal.SIGTERM)
+        else:
+            # If running in a subprocess, ignore SIGINT when disconnected
+            # under the interrupted click.prompt()
+            signal.signal(signal.SIGINT, signal.SIG_IGN)
 
     def _terminate_jobs_real(self):
         def kill_jobs():
@@ -616,6 +638,8 @@ class Scheduler:
             self.jobs_unsuspended()
         elif notification.notification_type == NotificationType.RETRY:
             self._failure_retry(notification.job_action, notification.element)
+        elif notification.notification_type == NotificationType.SIGTSTP:
+            self._suspend_event()
         else:
             # Do not raise exception once scheduler process is separated
             # as we don't want to pickle exceptions between processes
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 06643d9..4ac73e6 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -30,6 +30,7 @@ import shutil
 import tarfile
 import tempfile
 import queue
+import signal
 from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
 from tblib import pickling_support
@@ -43,6 +44,7 @@ from ._exceptions import (
     ArtifactError,
     set_last_task_error,
     SubprocessException,
+    set_last_exception,
 )
 from ._message import Message, MessageType
 from ._scheduler import (
@@ -63,7 +65,7 @@ from ._profile import Topics, PROFILER
 from ._state import State
 from .types import _KeyStrength, _SchedulerErrorAction
 from .plugin import Plugin
-from . import utils, _yaml, _site
+from . import utils, _yaml, _site, _signals
 from . import Scope, Consistency
 
 # Stream()
@@ -89,8 +91,9 @@ class Stream:
         self.session_elements = []  # List of elements being processed this session
         self.total_elements = []  # Total list of elements based on targets
         self.queues = []  # Queue objects
-        self.len_session_elements = None
-        self.len_total_elements = None
+        self.len_session_elements = ""
+        self.len_total_elements = ""
+        self.loop = None
 
         #
         # Private members
@@ -116,6 +119,8 @@ class Stream:
         self._scheduler_suspended = False
         self._notify_front_queue = None
         self._notify_back_queue = None
+        self._casd_process = None
+        self._watcher = None
 
     # init()
     #
@@ -133,10 +138,13 @@ class Stream:
 
         # Add traceback pickling support
         pickling_support.install()
-        try:
-            func(*args, **kwargs)
-        except Exception as e:  # pylint: disable=broad-except
-            notify.put(Notification(NotificationType.EXCEPTION, exception=SubprocessException(e)))
+        with _signals.blocked([signal.SIGINT, signal.SIGTERM, signal.SIGTSTP], ignore=True):
+            try:
+                func(*args, **kwargs)
+            except Exception as e:  # pylint: disable=broad-except
+                notify.put(Notification(NotificationType.EXCEPTION, exception=SubprocessException(e)))
+
+        notify.put(Notification(NotificationType.FINISH))
 
     def run_in_subprocess(self, func, *args, **kwargs):
         assert not self._subprocess
@@ -160,33 +168,48 @@ class Stream:
 
         self._subprocess.start()
 
-        # TODO connect signal handlers with asyncio
-        while self._subprocess.exitcode is None:
-            # check every given time interval on subprocess state
-            self._subprocess.join(0.01)
-            # if no exit code, go back to checking the message queue
-            self._loop()
-        print("Stopping loop...")
+        # We can now launch another async
+        self.loop = asyncio.new_event_loop()
+        self._connect_signals()
+        self._start_listening()
+        self.loop.set_exception_handler(self._handle_exception)
+        self._watch_casd()
+        self.loop.run_forever()
+
+        # Scheduler has stopped running, so safe to still have async here
+        self._stop_listening()
+        self._stop_watching_casd()
+        self.loop.close()
+        self._disconnect_signals()
+        self.loop = None
+        self._subprocess.join()
+        self._subprocess = None
 
         # Ensure no more notifcations to process
-        try:
-            while True:
-                notification = self._notify_front_queue.get_nowait()
-                self._notification_handler(notification)
-        except queue.Empty:
-            print("Finished processing notifications")
-            pass
+        while not self._notify_front_queue.empty():
+            notification = self._notify_front_queue.get_nowait()
+            self._notification_handler(notification)
 
     # cleanup()
     #
     # Cleans up application state
     #
     def cleanup(self):
-        # Close the notification queue
+        # Close the notification queues
         for q in [self._notify_back_queue, self._notify_front_queue]:
             if q is not None:
                 q.close()
-        # self._notification_queue.cancel_join_thread()
+                q.join_thread()
+                q = None
+
+        # Close loop
+        if self.loop is not None:
+            self.loop.close()
+            self.loop = None
+
+        # Ensure global event loop policy is unset
+        asyncio.set_event_loop_policy(None)
+
         if self._project:
             self._project.cleanup()
 
@@ -1249,10 +1272,14 @@ class Stream:
         # Send the notification to suspend jobs
         notification = Notification(NotificationType.SUSPEND)
         self._notify_back(notification)
+        # Disconnect signals if stream is handling them
+        self._disconnect_signals()
         yield
         # Unsuspend jobs on context exit
         notification = Notification(NotificationType.UNSUSPEND)
         self._notify_back(notification)
+        # Connect signals if stream is handling them
+        self._connect_signals()
 
     #############################################################
     #                    Private Methods                        #
@@ -1454,13 +1481,13 @@ class Stream:
         #
         self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
 
-        if self._session_start_callback is not None:
-            self._notify_front(Notification(NotificationType.START))
-
         # Also send through the session & total elements list lengths for status rendering
         element_totals = str(len(self.session_elements)), str(len(self.total_elements))
         self._notify_front(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals))
 
+        if self._session_start_callback is not None:
+            self._notify_front(Notification(NotificationType.START))
+
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
 
         if status == SchedStatus.ERROR:
@@ -1781,6 +1808,9 @@ class Stream:
             self._session_start_callback()
         elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
             self.len_session_elements, self.len_total_elements = notification.element_totals
+        elif notification.notification_type == NotificationType.FINISH:
+            if self.loop:
+                self.loop.stop()
         else:
             raise StreamError("Unrecognised notification type received")
 
@@ -1796,18 +1826,62 @@ class Stream:
         else:
             self._notification_handler(notification)
 
-    # The code to be run by the Stream's event loop while delegating
-    # work to a subprocess with the @subprocessed decorator
     def _loop(self):
-        assert self._notify_front_queue
-        # Check for and process new messages
-        while True:
-            try:
-                notification = self._notify_front_queue.get_nowait()
-                self._notification_handler(notification)
-            except queue.Empty:
-                notification = None
-                break
+        while not self._notify_front_queue.empty():
+            notification = self._notify_front_queue.get_nowait()
+            self._notification_handler(notification)
+
+    def _start_listening(self):
+        if self._notify_front_queue:
+            self.loop.add_reader(self._notify_front_queue._reader.fileno(), self._loop)
+
+    def _stop_listening(self):
+        if self._notify_front_queue:
+            self.loop.remove_reader(self._notify_front_queue._reader.fileno())
+
+    def _watch_casd(self):
+        if self._context.get_cascache()._casd_process:
+            self._casd_process = self._context.get_cascache().get_casd_process()
+            self._watcher = asyncio.get_child_watcher()
+            self._watcher.attach_loop(self.loop)
+            self._watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure)
+
+    def _abort_on_casd_failure(self, pid, returncode):
+        message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
+        self._notify_front(Notification(NotificationType.MESSAGE, message=message))
+        self._casd_process.returncode = returncode
+        notification = Notification(NotificationType.TERMINATE)
+        self._notify_back(notification)
+
+    def _stop_watching_casd(self):
+        self._watcher.remove_child_handler(self._casd_process.pid)
+        self._watcher.close()
+        self._casd_process = None
+
+    def _handle_exception(self, loop, context):
+        exception = context.get("exception")
+        # Set the last exception for the test suite if needed
+        set_last_exception(exception)
+        # Add it to context
+        self._context._subprocess_exception = exception
+        self.loop.stop()
+
+    def _connect_signals(self):
+        if self.loop:
+            self.loop.add_signal_handler(signal.SIGINT, self._interrupt_callback)
+            self.loop.add_signal_handler(
+                signal.SIGTERM, lambda: self._notify_back(Notification(NotificationType.TERMINATE))
+            )
+            self.loop.add_signal_handler(
+                signal.SIGTSTP, lambda: self._notify_back(Notification(NotificationType.SIGTSTP))
+            )
+
+    def _disconnect_signals(self):
+        if self.loop:
+            self.loop.remove_signal_handler(signal.SIGINT)
+            self.loop.remove_signal_handler(signal.SIGTSTP)
+            self.loop.remove_signal_handler(signal.SIGTERM)
+            signal.set_wakeup_fd(-1)
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing