You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:31:02 UTC
[buildstream] 08/19: Send scheduler notifications over a
multiprocessing queue
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit e39c94064de749d31bf41987fccaa19fc129a975
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jun 20 10:18:49 2019 +0100
Send scheduler notifications over a multiprocessing queue
---
src/buildstream/_scheduler/scheduler.py | 17 ++++++++++-------
src/buildstream/_stream.py | 4 +++-
2 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 9e120e4..14ecf30 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -98,7 +98,7 @@ class Notification:
class Scheduler():
def __init__(self, context,
- start_time, state, notification_handler,
+ start_time, state, notification_queue,
interrupt_callback=None,
ticker_callback=None,
interactive_failure=False):
@@ -131,8 +131,8 @@ class Scheduler():
self._cleanup_scheduled = False # Whether we have a cleanup job scheduled
self._cleanup_running = None # A running CleanupJob, or None
- # Callback to send notifications to report back to the Scheduler's owner
- self.notify = notification_handler
+ # Message to send notifications back to the Scheduler's owner
+ self._notification_queue = notification_queue
# Whether our exclusive jobs, like 'cleanup' are currently already
# waiting or active.
@@ -305,7 +305,7 @@ class Scheduler():
job_status=status,
failed_element=job.element_job,
element=element)
- self.notify(notification)
+ self._notify(notification)
if process_jobs:
# Now check for more jobs
@@ -369,7 +369,7 @@ class Scheduler():
full_name=job.name,
job_action=job.action_name,
elapsed_time=self.elapsed_time())
- self.notify(notification)
+ self._notify(notification)
job.start()
# Callback for the cache size job
@@ -589,7 +589,7 @@ class Scheduler():
return
notification = Notification(NotificationType.INTERRUPT)
- self.notify(notification)
+ self._notify(notification)
# _terminate_event():
#
@@ -649,9 +649,12 @@ class Scheduler():
# Regular timeout for driving status in the UI
def _tick(self):
notification = Notification(NotificationType.TICK)
- self.notify(notification)
+ self._notify(notification)
self.loop.call_later(1, self._tick)
+ def _notify(self, notification):
+ self._notification_queue.put(notification)
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 45bb41b..2d451c6 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -21,6 +21,7 @@
import itertools
import functools
+import multiprocessing as mp
import os
import sys
import stat
@@ -76,6 +77,7 @@ class Stream():
#
# Private members
#
+ self._notification_queue = mp.Queue()
self._context = context
self._artifacts = None
self._sourcecache = None
@@ -85,7 +87,7 @@ class Stream():
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
+ self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
interrupt_callback=interrupt_callback,
ticker_callback=ticker_callback,
interactive_failure=interactive_failure)