You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:46:57 UTC
[buildstream] 07/18: Make it more verbose with front & back
notifications
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch tpollard/subrebase
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit e87ec341fe53593741e59593489edc27a7857e3d
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Sep 27 17:46:46 2019 +0100
Make it more verbose with front & back notifications
---
src/buildstream/_scheduler/scheduler.py | 48 +++++++++++------------
src/buildstream/_stream.py | 67 ++++++++++++++++-----------------
2 files changed, 56 insertions(+), 59 deletions(-)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index f974553..5fad1f2 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -157,8 +157,8 @@ class Scheduler:
self._casd_process = None # handle to the casd process for monitoring purpose
# Bidirectional pipe to send notifications back to the Scheduler's owner
- self._notify_front = None
- self._notify_back = None
+ self._notify_front_queue = None
+ self._notify_back_queue = None
# Notifier callback to use if not running in a subprocess
self._notifier = notifier
@@ -190,7 +190,7 @@ class Scheduler:
asyncio.set_event_loop(self.loop)
# Notify that the loop has been created
- self._notify(Notification(NotificationType.RUNNING))
+ self._notify_front(Notification(NotificationType.RUNNING))
# Add timeouts
self.loop.call_later(1, self._tick)
@@ -204,7 +204,7 @@ class Scheduler:
_watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure)
# Add notification handler
- if self._notify_back:
+ if self._notify_back_queue:
self.loop.call_later(0.01, self._loop)
# Start the profiler
@@ -225,7 +225,7 @@ class Scheduler:
self.loop = None
# Notify that the loop has been reset
- self._notify(Notification(NotificationType.RUNNING))
+ self._notify_front(Notification(NotificationType.RUNNING))
if failed:
status = SchedStatus.ERROR
@@ -235,12 +235,12 @@ class Scheduler:
status = SchedStatus.SUCCESS
# Send the state taskgroups if we're running under the subprocess
- if self._notify_front:
+ if self._notify_front_queue:
# Don't pickle state
for group in self._state.task_groups.values():
group._state = None
notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
- self._notify_front.put(notification)
+ self._notify_front_queue.put(notification)
return status
@@ -279,7 +279,7 @@ class Scheduler:
# Notify the frontend that we're terminated as it might be
# from an interactive prompt callback or SIGTERM
- self._notify(Notification(NotificationType.TERMINATED))
+ self._notify_front(Notification(NotificationType.TERMINATED))
self.loop.call_soon(self._terminate_jobs_real)
# Block this until we're finished terminating jobs,
@@ -342,7 +342,7 @@ class Scheduler:
job_status=status,
element=element_info,
)
- self._notify(notification)
+ self._notify_front(notification)
self._sched()
# notify_messenger()
@@ -354,7 +354,7 @@ class Scheduler:
# handler, as assigned by context's messenger.
#
def notify_messenger(self, message):
- self._notify(Notification(NotificationType.MESSAGE, message=message))
+ self._notify_front(Notification(NotificationType.MESSAGE, message=message))
# set_last_task_error()
#
@@ -368,7 +368,7 @@ class Scheduler:
def set_last_task_error(self, domain, reason):
task_error = domain, reason
notification = Notification(NotificationType.TASK_ERROR, task_error=task_error)
- self._notify(notification)
+ self._notify_front(notification)
#######################################################
# Local Private Methods #
@@ -407,7 +407,7 @@ class Scheduler:
job_action=job.action_name,
time=self._state.elapsed_time(start_time=self._starttime),
)
- self._notify(notification)
+ self._notify_front(notification)
job.start()
# _sched_queue_jobs()
@@ -497,7 +497,7 @@ class Scheduler:
self._suspendtime = datetime.datetime.now()
self.suspended = True
# Notify that we're suspended
- self._notify(Notification(NotificationType.SUSPENDED))
+ self._notify_front(Notification(NotificationType.SUSPENDED))
for job in self._active_jobs:
job.suspend()
@@ -511,9 +511,9 @@ class Scheduler:
job.resume()
self.suspended = False
# Notify that we're unsuspended
- self._notify(Notification(NotificationType.SUSPENDED))
+ self._notify_front(Notification(NotificationType.SUSPENDED))
self._starttime += datetime.datetime.now() - self._suspendtime
- self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
+ self._notify_front(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
self._suspendtime = None
# _interrupt_event():
@@ -529,7 +529,7 @@ class Scheduler:
return
notification = Notification(NotificationType.INTERRUPT)
- self._notify(notification)
+ self._notify_front(notification)
# _terminate_event():
#
@@ -582,7 +582,7 @@ class Scheduler:
# Regular timeout for driving status in the UI
def _tick(self):
- self._notify(Notification(NotificationType.TICK))
+ self._notify_front(Notification(NotificationType.TICK))
self.loop.call_later(1, self._tick)
def _failure_retry(self, action_name, unique_id):
@@ -597,14 +597,14 @@ class Scheduler:
queue._task_group.failed_tasks.remove(element._get_full_name())
queue.enqueue([element])
- def _notify(self, notification):
+ def _notify_front(self, notification):
# Check if we need to call the notifier callback
- if self._notify_front:
- self._notify_front.put(notification)
+ if self._notify_front_queue:
+ self._notify_front_queue.put(notification)
else:
self._notifier(notification)
- def _stream_notification_handler(self, notification):
+ def _notification_handler(self, notification):
if notification.notification_type == NotificationType.TERMINATE:
self.terminate_jobs()
elif notification.notification_type == NotificationType.QUIT:
@@ -621,12 +621,12 @@ class Scheduler:
raise ValueError("Unrecognised notification type received")
def _loop(self):
- assert self._notify_back
+ assert self._notify_back_queue
# Check for and process new messages
while True:
try:
- notification = self._notify_back.get_nowait()
- self._stream_notification_handler(notification)
+ notification = self._notify_back_queue.get_nowait()
+ self._notification_handler(notification)
except queue.Empty:
notification = None
break
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 44a1797..fe49bec 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -108,16 +108,15 @@ class Stream:
context.messenger.set_state(self._state)
- self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler)
+ self._scheduler = Scheduler(context, session_start, self._state, self._notification_handler)
self._session_start_callback = session_start_callback
self._ticker_callback = ticker_callback
self._interrupt_callback = interrupt_callback
- self._notifier = self._scheduler._stream_notification_handler # Assign the schedulers notification handler
self._scheduler_running = False
self._scheduler_terminated = False
self._scheduler_suspended = False
- self._notify_front = None
- self._notify_back = None
+ self._notify_front_queue = None
+ self._notify_back_queue = None
# init()
#
@@ -146,14 +145,14 @@ class Stream:
mp_context = mp.get_context(method="fork")
process_name = "stream-{}".format(func.__name__)
- self._notify_front = mp.Queue()
- self._notify_back = mp.Queue()
+ self._notify_front_queue = mp.Queue()
+ self._notify_back_queue = mp.Queue()
# Tell the scheduler to not use the notifier callback
- self._scheduler._notify_front = self._notify_front
- self._scheduler._notify_back = self._notify_back
+ self._scheduler._notify_front_queue = self._notify_front_queue
+ self._scheduler._notify_back_queue = self._notify_back_queue
args = list(args)
- args.insert(0, self._notify_front)
+ args.insert(0, self._notify_front_queue)
args.insert(0, func)
self._subprocess = mp_context.Process(
@@ -173,8 +172,8 @@ class Stream:
# Ensure no more notifcations to process
try:
while True:
- notification = self._notify_front.get_nowait()
- self._scheduler_notification_handler(notification)
+ notification = self._notify_front_queue.get_nowait()
+ self._notification_handler(notification)
except queue.Empty:
print("Finished processing notifications")
pass
@@ -185,7 +184,7 @@ class Stream:
#
def cleanup(self):
# Close the notification queue
- for q in [self._notify_back, self._notify_front]:
+ for q in [self._notify_back_queue, self._notify_front_queue]:
if q is not None:
q.close()
# self._notification_queue.cancel_join_thread()
@@ -1157,7 +1156,7 @@ class Stream:
#
def terminate(self):
notification = Notification(NotificationType.TERMINATE)
- self._notify(notification)
+ self._notify_back(notification)
# quit()
#
@@ -1167,7 +1166,7 @@ class Stream:
#
def quit(self):
notification = Notification(NotificationType.QUIT)
- self._notify(notification)
+ self._notify_back(notification)
# suspend()
#
@@ -1177,11 +1176,11 @@ class Stream:
def suspend(self):
# Send the notification to suspend jobs
notification = Notification(NotificationType.SUSPEND)
- self._notify(notification)
+ self._notify_back(notification)
yield
# Unsuspend jobs on context exit
notification = Notification(NotificationType.UNSUSPEND)
- self._notify(notification)
+ self._notify_back(notification)
#############################################################
# Private Methods #
@@ -1412,7 +1411,7 @@ class Stream:
#
def _failure_retry(self, action_name, unique_id):
notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id)
- self._notify(notification)
+ self._notify_back(notification)
# _run()
#
@@ -1426,17 +1425,11 @@ class Stream:
self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
if self._session_start_callback is not None:
- if self._notify_front:
- self._notify_front.put(Notification(NotificationType.START))
- else:
- self._session_start_callback()
+ self._notify_front(Notification(NotificationType.START))
# Also send through the session & total elements list lengths for status rendering
element_totals = str(len(self.session_elements)), str(len(self.total_elements))
- if self._notify_front:
- self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals))
- else:
- self.len_session_elements, self.len_total_elements = element_totals
+ self._notify_front(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals))
status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
@@ -1706,7 +1699,7 @@ class Stream:
return element_targets, artifact_refs
- def _scheduler_notification_handler(self, notification):
+ def _notification_handler(self, notification):
if notification.notification_type == NotificationType.TASK_GROUPS:
self._state.task_groups = notification.task_groups
elif notification.notification_type == NotificationType.MESSAGE:
@@ -1741,23 +1734,27 @@ class Stream:
else:
raise StreamError("Unrecognised notification type received")
- def _notify(self, notification):
- # Set that the notifcation is for the scheduler
- # notification.for_scheduler = True
- if self._notify_back:
- self._notify_back.put(notification)
+ def _notify_back(self, notification):
+ if self._notify_back_queue:
+ self._notify_back_queue.put(notification)
+ else:
+ self._scheduler._notification_handler(notification)
+
+ def _notify_front(self, notification):
+ if self._notify_front_queue:
+ self._notify_front_queue.put(notification)
else:
- self._scheduler._stream_notification_handler(notification)
+ self._notification_handler(notification)
# The code to be run by the Stream's event loop while delegating
# work to a subprocess with the @subprocessed decorator
def _loop(self):
- assert self._notify_front
+ assert self._notify_front_queue
# Check for and process new messages
while True:
try:
- notification = self._notify_front.get_nowait()
- self._scheduler_notification_handler(notification)
+ notification = self._notify_front_queue.get_nowait()
+ self._notification_handler(notification)
except queue.Empty:
notification = None
break