You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:45:00 UTC
[buildstream] 02/16: Add in dual queue implementation for
subprocess build
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 889a6ea39390214d62294f5eee5e37b1a69f8c43
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Sep 16 12:20:06 2019 +0100
Add in dual queue implementation for subprocess build
---
src/buildstream/_scheduler/scheduler.py | 42 ++++++++---
src/buildstream/_stream.py | 123 ++++++++++++++++++++++++++------
src/buildstream/utils.py | 7 +-
3 files changed, 142 insertions(+), 30 deletions(-)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index c85c141..5a3da69 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -24,6 +24,7 @@ import asyncio
from itertools import chain
import signal
import datetime
+import queue
# Local imports
from .resources import Resources
@@ -64,6 +65,7 @@ class NotificationType(FastEnum):
RETRY = "retry"
MESSAGE = "message"
TASK_ERROR = "task_error"
+ EXCEPTION = "exception"
# Notification()
@@ -85,7 +87,9 @@ class Notification():
time=None,
element=None,
message=None,
- task_error=None):
+ task_error=None,
+ for_scheduler=False,
+ exception=None):
self.notification_type = notification_type
self.full_name = full_name
self.job_action = job_action
@@ -94,6 +98,7 @@ class Notification():
self.element = element
self.message = message
self.task_error = task_error # Tuple of domain & reason
+ self.exception = exception
# Scheduler()
@@ -119,7 +124,7 @@ class Notification():
class Scheduler():
def __init__(self, context,
- start_time, state, notification_queue, notifier):
+ start_time, state, notifier):
#
# Public members
@@ -143,8 +148,10 @@ class Scheduler():
self._state = state
self._casd_process = None # handle to the casd process for monitoring purpose
- # Bidirectional queue to send notifications back to the Scheduler's owner
- self._notification_queue = notification_queue
+ # Bidirectional pipe to send notifications back to the Scheduler's owner
+ self._notify_front = None
+ self._notify_back = None
+ # Notifier callback to use if not running in a subprocess
self._notifier = notifier
self.resources = Resources(context.sched_builders,
@@ -189,6 +196,10 @@ class Scheduler():
self._casd_process = casd_process
_watcher = asyncio.get_child_watcher()
_watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+
+ # Add notification handler
+ if self._notify_back:
+ self.loop.call_later(0.01, self._loop)
# Start the profiler
with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
@@ -580,12 +591,13 @@ class Scheduler():
queue.enqueue([element])
def _notify(self, notification):
- # Scheduler to Stream notifcations on right side
- self._notification_queue.append(notification)
- self._notifier()
+ # Check if we need to call the notifier callback
+ if self._notify_front:
+ self._notify_front.put(notification)
+ else:
+ self._notifier(notification)
- def _stream_notification_handler(self):
- notification = self._notification_queue.popleft()
+ def _stream_notification_handler(self, notification):
if notification.notification_type == NotificationType.TERMINATE:
self.terminate_jobs()
elif notification.notification_type == NotificationType.QUIT:
@@ -601,6 +613,18 @@ class Scheduler():
# as we don't want to pickle exceptions between processes
raise ValueError("Unrecognised notification type received")
+ def _loop(self):
+ assert self._notify_back
+ # Check for and process new messages
+ while True:
+ try:
+ notification = self._notify_back.get_nowait()
+ self._stream_notification_handler(notification)
+ except queue.Empty:
+ notification = None
+ break
+ self.loop.call_later(0.01, self._loop)
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 63c09da..7cb3515 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,6 +19,9 @@
# Jürg Billeter <ju...@codethink.co.uk>
# Tristan Maat <tr...@codethink.co.uk>
+import asyncio
+import functools
+import multiprocessing as mp
import os
import sys
import stat
@@ -26,9 +29,9 @@ import shlex
import shutil
import tarfile
import tempfile
+import queue
from contextlib import contextmanager, suppress
from fnmatch import fnmatch
-from collections import deque
from ._artifactelement import verify_artifact_ref, ArtifactElement
from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
@@ -79,13 +82,15 @@ class Stream():
self._project = None
self._pipeline = None
self._state = State(session_start) # Owned by Stream, used by Core to set state
- self._notification_queue = deque()
+ #self._notification_pipe_front, self._notification_pipe_back = mp.Pipe()
+ self._subprocess = None
self._starttime = session_start # Synchronised with Scheduler's relative start time
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
- self._scheduler_notification_handler)
+ # Scheduler may use callback for notification depending on whether it's subprocessed
+ self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler)
+
self._first_non_track_queue = None
self._session_start_callback = session_start_callback
self._ticker_callback = ticker_callback
@@ -94,6 +99,8 @@ class Stream():
self._scheduler_running = False
self._scheduler_terminated = False
self._scheduler_suspended = False
+ self._notify_front = None
+ self._notify_back = None
# init()
#
@@ -104,11 +111,69 @@ class Stream():
self._artifacts = self._context.artifactcache
self._sourcecache = self._context.sourcecache
+ @staticmethod
+ def _subprocess_main(func, notify, *args, **kwargs):
+ # Set main process
+ utils._reset_main_pid()
+ try:
+ func(*args, **kwargs)
+ except Exception as e:
+ notify.put(Notification(NotificationType.EXCEPTION, exception=e))
+
+ def run_in_subprocess(self, func, *args, **kwargs):
+ print("Args: {}".format([*args]))
+ print("Kwargs: {}".format(list(kwargs.items())))
+ assert not self._subprocess
+
+ mp_context = mp.get_context(method='fork')
+ process_name = "stream-{}".format(func.__name__)
+
+ self._notify_front = mp.Queue()
+ self._notify_back = mp.Queue()
+ # Tell the scheduler to not use the notifier callback
+ self._scheduler._notify_front = self._notify_front
+ self._scheduler._notify_back = self._notify_back
+
+ args = list(args)
+ args.insert(0, self._notify_front)
+ args.insert(0, func)
+ print("launching subprocess:", process_name)
+
+ self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
+ kwargs=kwargs, name=process_name)
+
+ self._subprocess.start()
+
+ # TODO connect signal handlers with asyncio
+ while self._subprocess.exitcode is None:
+ # check every given time interval on subprocess state
+ self._subprocess.join(0.01)
+ # if no exit code, go back to checking the message queue
+ self._loop()
+ print("Stopping loop...")
+
+ # Set main process back
+ utils._reset_main_pid()
+
+ # Ensure no more notifcations to process
+ try:
+ while True:
+ notification = self._notify_front.get_nowait()
+ self._scheduler_notification_handler(notification)
+ except queue.Empty:
+ print("Finished processing notifications")
+ pass
+
# cleanup()
#
# Cleans up application state
#
def cleanup(self):
+ # Close the notification queue
+ for q in [self._notify_back, self._notify_front]:
+ if q is not None:
+ q.close()
+ #self._notification_queue.cancel_join_thread()
if self._project:
self._project.cleanup()
@@ -233,6 +298,9 @@ class Stream():
return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command,
usebuildtree=buildtree)
+ def build(self, *args, **kwargs):
+ self.run_in_subprocess(self._build, *args, **kwargs)
+
# build()
#
# Builds (assembles) elements in the pipeline.
@@ -249,13 +317,13 @@ class Stream():
# If `remote` specified as None, then regular configuration will be used
# to determine where to push artifacts to.
#
- def build(self, targets, *,
- selection=PipelineSelection.PLAN,
- track_targets=None,
- track_except=None,
- track_cross_junctions=False,
- ignore_junction_targets=False,
- remote=None):
+ def _build(self, targets, *,
+ selection=PipelineSelection.PLAN,
+ track_targets=None,
+ track_except=None,
+ track_cross_junctions=False,
+ ignore_junction_targets=False,
+ remote=None):
use_config = True
if remote:
@@ -1657,11 +1725,7 @@ class Stream():
return element_targets, artifact_refs
- def _scheduler_notification_handler(self):
- # Check the queue is there
- assert self._notification_queue
- notification = self._notification_queue.pop()
-
+ def _scheduler_notification_handler(self, notification):
if notification.notification_type == NotificationType.MESSAGE:
self._context.messenger.message(notification.message)
elif notification.notification_type == NotificationType.INTERRUPT:
@@ -1671,6 +1735,7 @@ class Stream():
elif notification.notification_type == NotificationType.JOB_START:
self._state.add_task(notification.job_action, notification.full_name, notification.time)
elif notification.notification_type == NotificationType.JOB_COMPLETE:
+ # State between scheduler & stream is different if ran in subprocces
self._state.remove_task(notification.job_action, notification.full_name)
if notification.job_status == JobStatus.FAIL:
self._state.fail_task(notification.job_action, notification.full_name,
@@ -1685,14 +1750,32 @@ class Stream():
self._scheduler_suspended = not self._scheduler_suspended
elif notification.notification_type == NotificationType.TASK_ERROR:
set_last_task_error(*notification.task_error)
+ elif notification.notification_type == NotificationType.EXCEPTION:
+ raise notification.exception
else:
raise StreamError("Unrecognised notification type received")
def _notify(self, notification):
- # Stream to scheduler notifcations on left side
- self._notification_queue.appendleft(notification)
- self._notifier()
-
+ # Set that the notifcation is for the scheduler
+ #notification.for_scheduler = True
+ if self._notify_back:
+ self._notify_back.put(notification)
+ else:
+ self._scheduler._stream_notification_handler(notification)
+
+ # The code to be run by the Stream's event loop while delegating
+ # work to a subprocess with the @subprocessed decorator
+ def _loop(self):
+ assert self._notify_front
+ # Check for and process new messages
+ while True:
+ try:
+ notification = self._notify_front.get_nowait()
+ self._scheduler_notification_handler(notification)
+ except queue.Empty:
+ notification = None
+ break
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index de7c14b..75978ef 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -739,6 +739,11 @@ def _is_main_process():
return os.getpid() == _MAIN_PID
+def _reset_main_pid():
+ global _MAIN_PID
+ _MAIN_PID = os.getpid()
+
+
# Recursively remove directories, ignoring file permissions as much as
# possible.
def _force_rmtree(rootpath, **kwargs):
@@ -1429,7 +1434,7 @@ def _is_single_threaded():
# gRPC threads are not joined when shut down. Wait for them to exit.
wait = 0.1
for _ in range(0, int(_AWAIT_THREADS_TIMEOUT_SECONDS / wait)):
- if process.num_threads() == expected_num_threads:
+ if process.num_threads() == expected_num_threads or (expected_num_threads + 1):
return True
time.sleep(wait)
return False