You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:44:36 UTC
[buildstream] 10/17: basic async in stream
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch tpollard/buildsubprocess
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit ab2e085490ffc0595794af29be0005e81d43e47b
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Oct 11 10:45:58 2019 +0100
basic async in stream
---
src/buildstream/_context.py | 3 +
src/buildstream/_exceptions.py | 10 +++
src/buildstream/_frontend/app.py | 57 ++++++------
src/buildstream/_frontend/status.py | 1 +
src/buildstream/_scheduler/scheduler.py | 59 +++++++++----
src/buildstream/_stream.py | 150 ++++++++++++++++++++++++--------
6 files changed, 200 insertions(+), 80 deletions(-)
diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 4700144..cf1f632 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -174,6 +174,9 @@ class Context:
self._workspace_project_cache = WorkspaceProjectCache()
self._cascache = None
+ # An exception caught from subprocessing, used to handle run exceptions in App
+ self._subprocess_exception = None
+
# __enter__()
#
# Called when entering the with-statement context.
diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index 072be20..d4c793b 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -45,6 +45,16 @@ def get_last_exception():
return le
+# set_last_exception()
+#
+# Sets the last exception from the main process, used if Stream is running a subprocess
+#
+def set_last_exception(exception):
+ if "BST_TEST_SUITE" in os.environ:
+ global _last_exception
+ _last_exception = exception
+
+
# get_last_task_error()
#
# Fetches the last exception from a task
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 471901f..5fe38ce 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -300,39 +300,28 @@ class App:
try:
yield
except BstError as e:
+ self._handle_run_exception(e, session_name)
- # Print a nice summary if this is a session
- if session_name:
- elapsed = self.stream.elapsed_time
-
- if isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member
- self._message(MessageType.WARN, session_name + " Terminated", elapsed=elapsed)
- else:
- self._message(MessageType.FAIL, session_name, elapsed=elapsed)
-
- # Notify session failure
- self._notify("{} failed".format(session_name), e)
-
- if self._started:
- self._print_summary()
-
- # Exit with the error
- self._error_exit(e)
except RecursionError:
click.echo(
"RecursionError: Dependency depth is too large. Maximum recursion depth exceeded.", err=True
)
sys.exit(-1)
- else:
+ if self.context._subprocess_exception:
+ # If a handled exception was thrown in a Stream subprocessed asyncio method, handle it
+ if isinstance(self.context._subprocess_exception, BstError):
+ self._handle_run_exception(self.context._subprocess_exception, session_name)
+ else:
+ # We don't gracefully handle non BstError() Excpetions
+ raise self.context._subprocess_exception # pylint: disable=raising-bad-type
+ elif session_name:
# No exceptions occurred, print session time and summary
- if session_name:
- self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
- if self._started:
- self._print_summary()
-
- # Notify session success
- self._notify("{} succeeded".format(session_name), "")
+ self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+ if self._started:
+ self._print_summary()
+ # Notify session success
+ self._notify("{} succeeded".format(session_name), "")
# init_project()
#
@@ -972,6 +961,24 @@ class App:
return (project_name, format_version, element_path)
+ def _handle_run_exception(self, exception, session_name):
+ # Print a nice summary if this is a session
+ if session_name:
+ elapsed = self.stream.elapsed_time
+
+ if isinstance(exception, StreamError) and exception.terminated: # pylint: disable=no-member
+ self._message(MessageType.WARN, session_name + " Terminated", elapsed=elapsed)
+ else:
+ self._message(MessageType.FAIL, session_name, elapsed=elapsed)
+
+ # Notify session failure
+ self._notify("{} failed".format(session_name), exception)
+
+ if self._started:
+ self._print_summary()
+
+ self._error_exit(exception)
+
#
# Return a value processor for partial choice matching.
diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index d3132fe..f16e7d1 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -357,6 +357,7 @@ class _StatusHeader:
#
# ========= 00:00:00 project-name (143/387) =========
#
+
session = self._stream.len_session_elements
total = self._stream.len_total_elements
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d9e6882..3a3cb9c 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -73,6 +73,8 @@ class NotificationType(FastEnum):
START = "start"
TASK_GROUPS = "task_groups"
ELEMENT_TOTALS = "element_totals"
+ FINISH = "finish"
+ SIGTSTP = "sigstp"
# Notification()
@@ -188,6 +190,9 @@ class Scheduler:
# Hold on to the queues to process
self.queues = queues
+ # Check if we're subprocessed
+ subprocessed = bool(self._notify_front_queue)
+
# Ensure that we have a fresh new event loop, in case we want
# to run another test in this thread.
self.loop = asyncio.new_event_loop()
@@ -202,14 +207,14 @@ class Scheduler:
# Handle unix signals while running
self._connect_signals()
- # Watch casd while running to ensure it doesn't die
- self._casd_process = casd_process_manager.process
- _watcher = asyncio.get_child_watcher()
-
- def abort_casd(pid, returncode):
- asyncio.get_event_loop().call_soon(self._abort_on_casd_failure, pid, returncode)
+ # If we're not in a subprocess, watch casd while running to ensure it doesn't die
+ if not subprocessed:
+ self._casd_process = casd_process_manager.process
+ _watcher = asyncio.get_child_watcher()
+ def abort_casd(pid, returncode):
+ self.loop.call_soon(self._abort_on_casd_failure, pid, returncode)
- _watcher.add_child_handler(self._casd_process.pid, abort_casd)
+ _watcher.add_child_handler(self._casd_process.pid, abort_casd)
# Add notification listener if in subprocess
self._start_listening()
@@ -223,9 +228,11 @@ class Scheduler:
self._stop_listening()
self.loop.close()
- # Stop watching casd
- _watcher.remove_child_handler(self._casd_process.pid)
- self._casd_process = None
+ # Stop watching casd if not subprocessed
+ if self._casd_process:
+ _watcher.remove_child_handler(self._casd_process.pid)
+ _watcher.close()
+ self._casd_process = None
# Stop handling unix signals
self._disconnect_signals()
@@ -244,7 +251,7 @@ class Scheduler:
status = SchedStatus.SUCCESS
# Send the state taskgroups if we're running under the subprocess
- if self._notify_front_queue:
+ if subprocessed:
# Don't pickle state
for group in self._state.task_groups.values():
group._state = None
@@ -543,6 +550,8 @@ class Scheduler:
if self.terminated:
return
+ # This event handler is only set when not running in a subprocess, scheduler
+ # to handle keyboard interrupt
notification = Notification(NotificationType.INTERRUPT)
self._notify_front(notification)
@@ -572,17 +581,29 @@ class Scheduler:
# _connect_signals():
#
- # Connects our signal handler event callbacks to the mainloop
+ # Connects our signal handler event callbacks to the mainloop. Signals
+ # only need to be connected if scheduler running in the 'main' process
#
def _connect_signals(self):
- self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event)
- self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event)
- self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event)
+ if not self._notify_front_queue:
+ self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event)
+ self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event)
+ self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event)
+ # _disconnect_signals():
+ #
+ # Disconnects our signal handler event callbacks from the mainloop. Signals
+ # only need to be disconnected if scheduler running in the 'main' process
+ #
def _disconnect_signals(self):
- self.loop.remove_signal_handler(signal.SIGINT)
- self.loop.remove_signal_handler(signal.SIGTSTP)
- self.loop.remove_signal_handler(signal.SIGTERM)
+ if not self._notify_front_queue:
+ self.loop.remove_signal_handler(signal.SIGINT)
+ self.loop.remove_signal_handler(signal.SIGTSTP)
+ self.loop.remove_signal_handler(signal.SIGTERM)
+ else:
+ # If running in a subprocess, ignore SIGINT when disconnected
+ # under the interrupted click.prompt()
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
def _terminate_jobs_real(self):
def kill_jobs():
@@ -630,6 +651,8 @@ class Scheduler:
self.jobs_unsuspended()
elif notification.notification_type == NotificationType.RETRY:
self._failure_retry(notification.job_action, notification.element)
+ elif notification.notification_type == NotificationType.SIGTSTP:
+ self._suspend_event()
else:
# Do not raise exception once scheduler process is separated
# as we don't want to pickle exceptions between processes
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 8249337..ba61594 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -30,6 +30,7 @@ import shutil
import tarfile
import tempfile
import queue
+import signal
from contextlib import contextmanager, suppress
from fnmatch import fnmatch
from typing import List, Tuple
@@ -44,6 +45,7 @@ from ._exceptions import (
ArtifactError,
set_last_task_error,
SubprocessException,
+ set_last_exception,
)
from ._message import Message, MessageType
from ._scheduler import (
@@ -65,7 +67,7 @@ from ._profile import Topics, PROFILER
from ._state import State
from .types import _KeyStrength, _SchedulerErrorAction
from .plugin import Plugin
-from . import utils, _yaml, _site
+from . import utils, _yaml, _site, _signals
from . import Scope
# Stream()
@@ -91,8 +93,9 @@ class Stream:
self.session_elements = [] # List of elements being processed this session
self.total_elements = [] # Total list of elements based on targets
self.queues = [] # Queue objects
- self.len_session_elements = None
- self.len_total_elements = None
+ self.len_session_elements = ""
+ self.len_total_elements = ""
+ self.loop = None
#
# Private members
@@ -117,6 +120,8 @@ class Stream:
self._scheduler_suspended = False
self._notify_front_queue = None
self._notify_back_queue = None
+ self._casd_process = None
+ self._watcher = None
# init()
#
@@ -134,10 +139,13 @@ class Stream:
# Add traceback pickling support
pickling_support.install()
- try:
- func(*args, **kwargs)
- except Exception as e: # pylint: disable=broad-except
- notify.put(Notification(NotificationType.EXCEPTION, exception=SubprocessException(e)))
+ with _signals.blocked([signal.SIGINT, signal.SIGTERM, signal.SIGTSTP], ignore=True):
+ try:
+ func(*args, **kwargs)
+ except Exception as e: # pylint: disable=broad-except
+ notify.put(Notification(NotificationType.EXCEPTION, exception=SubprocessException(e)))
+
+ notify.put(Notification(NotificationType.FINISH))
def run_in_subprocess(self, func, *args, **kwargs):
assert not self._subprocess
@@ -161,33 +169,48 @@ class Stream:
self._subprocess.start()
- # TODO connect signal handlers with asyncio
- while self._subprocess.exitcode is None:
- # check every given time interval on subprocess state
- self._subprocess.join(0.01)
- # if no exit code, go back to checking the message queue
- self._loop()
- print("Stopping loop...")
+ # We can now launch another async
+ self.loop = asyncio.new_event_loop()
+ self._connect_signals()
+ self._start_listening()
+ self.loop.set_exception_handler(self._handle_exception)
+ self._watch_casd()
+ self.loop.run_forever()
+
+ # Scheduler has stopped running, so safe to still have async here
+ self._stop_listening()
+ self._stop_watching_casd()
+ self.loop.close()
+ self._disconnect_signals()
+ self.loop = None
+ self._subprocess.join()
+ self._subprocess = None
# Ensure no more notifcations to process
- try:
- while True:
- notification = self._notify_front_queue.get_nowait()
- self._notification_handler(notification)
- except queue.Empty:
- print("Finished processing notifications")
- pass
+ while not self._notify_front_queue.empty():
+ notification = self._notify_front_queue.get_nowait()
+ self._notification_handler(notification)
# cleanup()
#
# Cleans up application state
#
def cleanup(self):
- # Close the notification queue
+ # Close the notification queues
for q in [self._notify_back_queue, self._notify_front_queue]:
if q is not None:
q.close()
- # self._notification_queue.cancel_join_thread()
+ q.join_thread()
+ q = None
+
+ # Close loop
+ if self.loop is not None:
+ self.loop.close()
+ self.loop = None
+
+ # Ensure global event loop policy is unset
+ asyncio.set_event_loop_policy(None)
+
if self._project:
self._project.cleanup()
@@ -1184,10 +1207,14 @@ class Stream:
# Send the notification to suspend jobs
notification = Notification(NotificationType.SUSPEND)
self._notify_back(notification)
+ # Disconnect signals if stream is handling them
+ self._disconnect_signals()
yield
# Unsuspend jobs on context exit
notification = Notification(NotificationType.UNSUSPEND)
self._notify_back(notification)
+ # Connect signals if stream is handling them
+ self._connect_signals()
#############################################################
# Private Methods #
@@ -1431,13 +1458,13 @@ class Stream:
#
self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
- if self._session_start_callback is not None:
- self._notify_front(Notification(NotificationType.START))
-
# Also send through the session & total elements list lengths for status rendering
element_totals = str(len(self.session_elements)), str(len(self.total_elements))
self._notify_front(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals))
+ if self._session_start_callback is not None:
+ self._notify_front(Notification(NotificationType.START))
+
status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
if status == SchedStatus.ERROR:
@@ -1738,6 +1765,9 @@ class Stream:
self._session_start_callback()
elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
self.len_session_elements, self.len_total_elements = notification.element_totals
+ elif notification.notification_type == NotificationType.FINISH:
+ if self.loop:
+ self.loop.stop()
else:
raise StreamError("Unrecognised notification type received")
@@ -1753,18 +1783,64 @@ class Stream:
else:
self._notification_handler(notification)
- # The code to be run by the Stream's event loop while delegating
- # work to a subprocess with the @subprocessed decorator
def _loop(self):
- assert self._notify_front_queue
- # Check for and process new messages
- while True:
- try:
- notification = self._notify_front_queue.get_nowait()
- self._notification_handler(notification)
- except queue.Empty:
- notification = None
- break
+ while not self._notify_front_queue.empty():
+ notification = self._notify_front_queue.get_nowait()
+ self._notification_handler(notification)
+
+ def _start_listening(self):
+ if self._notify_front_queue:
+ self.loop.add_reader(self._notify_front_queue._reader.fileno(), self._loop)
+
+ def _stop_listening(self):
+ if self._notify_front_queue:
+ self.loop.remove_reader(self._notify_front_queue._reader.fileno())
+
+ def _watch_casd(self):
+ if self._context.get_cascache().get_casd_process_manager().process:
+ self._casd_process = self._context.get_cascache().get_casd_process_manager().process
+ self._watcher = asyncio.get_child_watcher()
+ self._watcher.attach_loop(self.loop)
+ def abort_casd(pid, returncode):
+ self.loop.call_soon(self._abort_on_casd_failure, pid, returncode)
+ self._watcher.add_child_handler(self._casd_process.pid, abort_casd)
+
+ def _abort_on_casd_failure(self, pid, returncode):
+ message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
+ self._notify_front(Notification(NotificationType.MESSAGE, message=message))
+ self._casd_process.returncode = returncode
+ notification = Notification(NotificationType.TERMINATE)
+ self._notify_back(notification)
+
+ def _stop_watching_casd(self):
+ self._watcher.remove_child_handler(self._casd_process.pid)
+ self._watcher.close()
+ self._casd_process = None
+
+ def _handle_exception(self, loop, context):
+ exception = context.get("exception")
+ # Set the last exception for the test suite if needed
+ set_last_exception(exception)
+ # Add it to context
+ self._context._subprocess_exception = exception
+ self.loop.stop()
+
+ def _connect_signals(self):
+ if self.loop:
+ self.loop.add_signal_handler(signal.SIGINT, self._interrupt_callback)
+ self.loop.add_signal_handler(
+ signal.SIGTERM, lambda: self._notify_back(Notification(NotificationType.TERMINATE))
+ )
+ self.loop.add_signal_handler(
+ signal.SIGTSTP, lambda: self._notify_back(Notification(NotificationType.SIGTSTP))
+ )
+
+ def _disconnect_signals(self):
+ if self.loop:
+ self.loop.remove_signal_handler(signal.SIGINT)
+ self.loop.remove_signal_handler(signal.SIGTSTP)
+ self.loop.remove_signal_handler(signal.SIGTERM)
+ signal.set_wakeup_fd(-1)
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing