You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:44:28 UTC
[buildstream] 02/17: Add in dual queue implementation for
subprocess build.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch tpollard/buildsubprocess
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit ee608508d792d56dd077aa6adc52da492a67e1cc
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Sep 16 12:20:06 2019 +0100
Add in dual queue implementation for subprocess build.
This also adapts utils.py handling of PID to account for
the stream multiprocessing, and how callers assert that
they're the 'main_process' or in a job.
---
doc/source/hacking/coding_guidelines.rst | 2 +-
src/buildstream/_messenger.py | 2 +-
src/buildstream/_scheduler/scheduler.py | 43 +++++++++---
src/buildstream/_stream.py | 104 ++++++++++++++++++++++++++----
src/buildstream/_workspaces.py | 2 +-
src/buildstream/element.py | 6 +-
src/buildstream/sandbox/_sandboxremote.py | 2 +-
src/buildstream/utils.py | 31 ++++++---
8 files changed, 153 insertions(+), 39 deletions(-)
diff --git a/doc/source/hacking/coding_guidelines.rst b/doc/source/hacking/coding_guidelines.rst
index ecab241..10f76e9 100644
--- a/doc/source/hacking/coding_guidelines.rst
+++ b/doc/source/hacking/coding_guidelines.rst
@@ -609,7 +609,7 @@ In these cases, do **not** raise any of the ``BstError`` class exceptions.
Instead, use the ``assert`` statement, e.g.::
- assert utils._is_main_process(), \
+ assert not utils._is_job_process(), \
"Attempted to save workspace configuration from child process"
This will result in a ``BUG`` message with the stack trace included being
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 03b2833..9e2269f 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -270,7 +270,7 @@ class Messenger:
# we also do not allow it in the main process.
assert self._log_handle is None
assert self._log_filename is None
- assert not utils._is_main_process()
+ assert utils._is_job_process()
# Create the fully qualified logfile in the log directory,
# appending the pid and .log extension at the end.
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 0700186..df9819b 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -25,6 +25,7 @@ import asyncio
from itertools import chain
import signal
import datetime
+import queue
# Local imports
from .resources import Resources
@@ -68,6 +69,7 @@ class NotificationType(FastEnum):
RETRY = "retry"
MESSAGE = "message"
TASK_ERROR = "task_error"
+ EXCEPTION = "exception"
# Notification()
@@ -89,7 +91,9 @@ class Notification:
time=None,
element=None,
message=None,
- task_error=None
+ task_error=None,
+ for_scheduler=None,
+ exception=None
):
self.notification_type = notification_type
self.full_name = full_name
@@ -99,6 +103,7 @@ class Notification:
self.element = element
self.message = message
self.task_error = task_error # Tuple of domain & reason
+ self.exception = exception
# Scheduler()
@@ -122,7 +127,7 @@ class Notification:
# ticker_callback: A callback call once per second
#
class Scheduler:
- def __init__(self, context, start_time, state, notification_queue, notifier):
+ def __init__(self, context, start_time, state, notifier):
#
# Public members
@@ -148,8 +153,11 @@ class Scheduler:
self._sched_handle = None # Whether a scheduling job is already scheduled or not
- # Bidirectional queue to send notifications back to the Scheduler's owner
- self._notification_queue = notification_queue
+ # Pair of queues to send notifications back to the Scheduler's owner
+ self._notify_front = None
+ self._notify_back = None
+
+ # Notifier callback to use if not running in a subprocess
self._notifier = notifier
self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
@@ -197,6 +205,10 @@ class Scheduler:
_watcher.add_child_handler(self._casd_process.pid, abort_casd)
+ # Add notification handler
+ if self._notify_back:
+ self.loop.call_later(0.01, self._loop)
+
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
# Run the queues
@@ -586,12 +598,13 @@ class Scheduler:
queue.enqueue([element])
def _notify(self, notification):
- # Scheduler to Stream notifcations on right side
- self._notification_queue.append(notification)
- self._notifier()
+ # Check if we need to call the notifier callback
+ if self._notify_front:
+ self._notify_front.put(notification)
+ else:
+ self._notifier(notification)
- def _stream_notification_handler(self):
- notification = self._notification_queue.popleft()
+ def _stream_notification_handler(self, notification):
if notification.notification_type == NotificationType.TERMINATE:
self.terminate_jobs()
elif notification.notification_type == NotificationType.QUIT:
@@ -607,6 +620,18 @@ class Scheduler:
# as we don't want to pickle exceptions between processes
raise ValueError("Unrecognised notification type received")
+ def _loop(self):
+ assert self._notify_back
+ # Check for and process new messages
+ while True:
+ try:
+ notification = self._notify_back.get_nowait()
+ self._stream_notification_handler(notification)
+ except queue.Empty:
+ notification = None
+ break
+ self.loop.call_later(0.01, self._loop)
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index ab270b4..c2263c3 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,6 +19,9 @@
# Jürg Billeter <ju...@codethink.co.uk>
# Tristan Maat <tr...@codethink.co.uk>
+import asyncio
+import functools
+import multiprocessing as mp
import os
import sys
import stat
@@ -26,9 +29,9 @@ import shlex
import shutil
import tarfile
import tempfile
+import queue
from contextlib import contextmanager, suppress
from fnmatch import fnmatch
-from collections import deque
from typing import List, Tuple
from ._artifactelement import verify_artifact_ref, ArtifactElement
@@ -90,14 +93,13 @@ class Stream:
self._project = None
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
- self._notification_queue = deque()
+ # self._notification_pipe_front, self._notification_pipe_back = mp.Pipe()
+ self._subprocess = None
self._starttime = session_start # Synchronised with Scheduler's relative start time
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(
- context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler
- )
+ self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler)
self._session_start_callback = session_start_callback
self._ticker_callback = ticker_callback
self._interrupt_callback = interrupt_callback
@@ -105,6 +107,8 @@ class Stream:
self._scheduler_running = False
self._scheduler_terminated = False
self._scheduler_suspended = False
+ self._notify_front = None
+ self._notify_back = None
# init()
#
@@ -115,11 +119,65 @@ class Stream:
self._artifacts = self._context.artifactcache
self._sourcecache = self._context.sourcecache
+ @staticmethod
+ def _subprocess_main(func, notify, *args, **kwargs):
+ # Set main process
+ utils._set_stream_pid()
+
+ try:
+ func(*args, **kwargs)
+ except Exception as e:
+ notify.put(Notification(NotificationType.EXCEPTION, exception=e))
+
+ def run_in_subprocess(self, func, *args, **kwargs):
+ assert not self._subprocess
+
+ mp_context = mp.get_context(method="fork")
+ process_name = "stream-{}".format(func.__name__)
+
+ self._notify_front = mp.Queue()
+ self._notify_back = mp.Queue()
+ # Tell the scheduler to not use the notifier callback
+ self._scheduler._notify_front = self._notify_front
+ self._scheduler._notify_back = self._notify_back
+
+ args = list(args)
+ args.insert(0, self._notify_front)
+ args.insert(0, func)
+
+ self._subprocess = mp_context.Process(
+ target=Stream._subprocess_main, args=args, kwargs=kwargs, name=process_name
+ )
+
+ self._subprocess.start()
+
+ # TODO connect signal handlers with asyncio
+ while self._subprocess.exitcode is None:
+ # check every given time interval on subprocess state
+ self._subprocess.join(0.01)
+ # if no exit code, go back to checking the message queue
+ self._loop()
+ print("Stopping loop...")
+
+ # Ensure no more notifcations to process
+ try:
+ while True:
+ notification = self._notify_front.get_nowait()
+ self._scheduler_notification_handler(notification)
+ except queue.Empty:
+ print("Finished processing notifications")
+ pass
+
# cleanup()
#
# Cleans up application state
#
def cleanup(self):
+ # Close the notification queue
+ for q in [self._notify_back, self._notify_front]:
+ if q is not None:
+ q.close()
+ # self._notification_queue.cancel_join_thread()
if self._project:
self._project.cleanup()
@@ -261,6 +319,9 @@ class Stream:
scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree
)
+ def build(self, *args, **kwargs):
+ self.run_in_subprocess(self._build, *args, **kwargs)
+
# build()
#
# Builds (assembles) elements in the pipeline.
@@ -274,7 +335,7 @@ class Stream:
# If `remote` specified as None, then regular configuration will be used
# to determine where to push artifacts to.
#
- def build(self, targets, *, selection=PipelineSelection.PLAN, ignore_junction_targets=False, remote=None):
+ def _build(self, targets, *, selection=PipelineSelection.PLAN, ignore_junction_targets=False, remote=None):
use_config = True
if remote:
@@ -1624,11 +1685,7 @@ class Stream:
return element_targets, artifact_refs
- def _scheduler_notification_handler(self):
- # Check the queue is there
- assert self._notification_queue
- notification = self._notification_queue.pop()
-
+ def _scheduler_notification_handler(self, notification):
if notification.notification_type == NotificationType.MESSAGE:
self._context.messenger.message(notification.message)
elif notification.notification_type == NotificationType.INTERRUPT:
@@ -1638,6 +1695,7 @@ class Stream:
elif notification.notification_type == NotificationType.JOB_START:
self._state.add_task(notification.job_action, notification.full_name, notification.time)
elif notification.notification_type == NotificationType.JOB_COMPLETE:
+ # State between scheduler & stream is different if run in a subprocces
self._state.remove_task(notification.job_action, notification.full_name)
if notification.job_status == JobStatus.FAIL:
self._state.fail_task(notification.job_action, notification.full_name, notification.element)
@@ -1651,13 +1709,31 @@ class Stream:
self._scheduler_suspended = not self._scheduler_suspended
elif notification.notification_type == NotificationType.TASK_ERROR:
set_last_task_error(*notification.task_error)
+ elif notification.notification_type == NotificationType.EXCEPTION:
+ raise notification.exception
else:
raise StreamError("Unrecognised notification type received")
def _notify(self, notification):
- # Stream to scheduler notifcations on left side
- self._notification_queue.appendleft(notification)
- self._notifier()
+ # Set that the notifcation is for the scheduler
+ # notification.for_scheduler = True
+ if self._notify_back:
+ self._notify_back.put(notification)
+ else:
+ self._scheduler._stream_notification_handler(notification)
+
+ # The code to be run by the Stream's event loop while delegating
+ # work to a subprocess with the @subprocessed decorator
+ def _loop(self):
+ assert self._notify_front
+ # Check for and process new messages
+ while True:
+ try:
+ notification = self._notify_front.get_nowait()
+ self._scheduler_notification_handler(notification)
+ except queue.Empty:
+ notification = None
+ break
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
diff --git a/src/buildstream/_workspaces.py b/src/buildstream/_workspaces.py
index 49b76a7..ec61078 100644
--- a/src/buildstream/_workspaces.py
+++ b/src/buildstream/_workspaces.py
@@ -472,7 +472,7 @@ class Workspaces:
# create_workspace permanent
#
def save_config(self):
- assert utils._is_main_process()
+ assert not utils._is_job_process()
config = {
"format-version": BST_WORKSPACE_FORMAT_VERSION,
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 106460b..4eb43d7 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -768,7 +768,7 @@ class Element(Plugin):
self.info("Resetting workspace state, last successful build is no longer in the cache")
# In case we are staging in the main process
- if utils._is_main_process():
+ if not utils._is_job_process():
context.get_workspaces().save_config()
for dep in self.dependencies(scope):
@@ -793,7 +793,7 @@ class Element(Plugin):
# In case we are running `bst shell`, this happens in the
# main process and we need to update the workspace config
- if utils._is_main_process():
+ if not utils._is_job_process():
context.get_workspaces().save_config()
result = dep.stage_artifact(
@@ -1580,7 +1580,7 @@ class Element(Plugin):
self._update_ready_for_runtime_and_cached()
if self._get_workspace() and self._cached_success():
- assert utils._is_main_process(), "Attempted to save workspace configuration from child process"
+ assert not utils._is_job_process(), "Attempted to save workspace configuration from child process"
#
# Note that this block can only happen in the
# main process, since `self._cached_success()` cannot
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index 5ec1c97..815cc50 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -59,7 +59,7 @@ class SandboxRemote(SandboxREAPI):
return
# gRPC doesn't support fork without exec, which is used in the main process.
- assert not utils._is_main_process()
+ assert utils._is_job_process()
self.storage_url = config.storage_service["url"]
self.exec_url = config.exec_service["url"]
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index b6716a2..b1d1585 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -58,6 +58,9 @@ _URI_SCHEMES = ["http", "https", "ftp", "file", "git", "sftp", "ssh"]
# Main process pid
_MAIN_PID = os.getpid()
+# This is different to _MAIN_PID if running a subprocessed stream entry point
+_STREAM_PID = _MAIN_PID
+
# The number of threads in the main process at startup.
# This is 1 except for certain test environments (xdist/execnet).
_INITIAL_NUM_THREADS_IN_MAIN_PROCESS = 1
@@ -770,13 +773,18 @@ def _pretty_size(size, dec_places=0):
return "{size:g}{unit}".format(size=round(psize, dec_places), unit=unit)
-# _is_main_process()
+# _is_job_process()
#
-# Return whether we are in the main process or not.
+# Return whether we are in a job process.
#
-def _is_main_process():
- assert _MAIN_PID is not None
- return os.getpid() == _MAIN_PID
+def _is_job_process():
+ assert _STREAM_PID is not None
+ return os.getpid() != _STREAM_PID
+
+
+def _set_stream_pid() -> None:
+ global _STREAM_PID # pylint: disable=global-statement
+ _STREAM_PID = os.getpid()
# Remove a path and any empty directories leading up to it.
@@ -1517,10 +1525,15 @@ def _is_single_threaded():
# Use psutil as threading.active_count() doesn't include gRPC threads.
process = psutil.Process()
- if process.pid == _MAIN_PID:
- expected_num_threads = _INITIAL_NUM_THREADS_IN_MAIN_PROCESS
- else:
- expected_num_threads = 1
+ expected_num_threads = 1
+
+ if process.pid == _STREAM_PID:
+ if _STREAM_PID != _MAIN_PID:
+ # multiprocessing.Queue() has a background thread for object pickling,
+ # see https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues
+ expected_num_threads += 1
+ else:
+ expected_num_threads = _INITIAL_NUM_THREADS_IN_MAIN_PROCESS
# gRPC threads are not joined when shut down. Wait for them to exit.
wait = 0.1