You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:45:10 UTC
[buildstream] 12/16: Add support for dynamic queue status reporting
to frontend State()
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 72c0d08cfa18f6cfff250626a62f62a1745bfe85
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Oct 24 14:23:55 2019 +0100
Add support for dynamic queue status reporting to frontend State()
---
src/buildstream/_scheduler/queues/queue.py | 2 ++
src/buildstream/_scheduler/scheduler.py | 15 ++++-----
src/buildstream/_state.py | 53 ++++++++++++++++++++++++------
src/buildstream/_stream.py | 12 ++++++-
4 files changed, 62 insertions(+), 20 deletions(-)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 664741b..e31259b 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -86,6 +86,8 @@ class Queue():
self._max_retries = scheduler.context.sched_network_retries
self._task_group = self._scheduler._state.add_task_group(self.action_name, self.complete_name)
+ self._scheduler._state.register_task_groups_changed_callback(self._scheduler._update_task_groups,
+ name=self.action_name)
# destroy()
#
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 74a688f..2b9f9e6 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -105,7 +105,7 @@ class Notification():
self.message = message
self.task_error = task_error # Tuple of domain & reason
self.exception = exception
- self.task_groups = task_groups
+ self.task_groups = task_groups # Tuple of queue name, complete name, task change, & optional element name
self.element_totals = element_totals
@@ -242,14 +242,6 @@ class Scheduler():
else:
status = SchedStatus.SUCCESS
- # Send the state taskgroups if we're running under the subprocess
- if subprocessed:
- # Don't pickle state
- for group in self._state.task_groups.values():
- group._state = None
- notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
- self._notify_front_queue.put(notification)
-
return status
# clear_queues()
@@ -650,6 +642,11 @@ class Scheduler():
if self._notify_back_queue:
self.loop.remove_reader(self._notify_back_queue._reader.fileno())
+ def _update_task_groups(self, name, complete_name, task, full_name=None):
+ if self._notify_front_queue:
+ changes = (name, complete_name, task, full_name)
+ self._notify_front(Notification(NotificationType.TASK_GROUPS, task_groups=changes))
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index 310e12a..f35f07a 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -54,8 +54,10 @@ class TaskGroup():
#
def add_processed_task(self):
self.processed_tasks += 1
- for cb in self._state._task_groups_changed_cbs:
- cb()
+ for cb, name in self._state._task_groups_changed_cbs:
+ # If name matches group, or if name not given call the cb
+ if name == self.name or name is None:
+ cb(name, self.complete_name, 'processed_tasks')
# add_skipped_task()
#
@@ -65,9 +67,10 @@ class TaskGroup():
#
def add_skipped_task(self):
self.skipped_tasks += 1
-
- for cb in self._state._task_groups_changed_cbs:
- cb()
+ for cb, name in self._state._task_groups_changed_cbs:
+ # If name matches group, or if name not given call the cb
+ if name == self.name or name is None:
+ cb(name, self.complete_name, 'skipped_tasks')
# add_failed_task()
#
@@ -82,11 +85,10 @@ class TaskGroup():
#
def add_failed_task(self, full_name):
self.failed_tasks.append(full_name)
-
- for cb in self._state._task_groups_changed_cbs:
- cb()
-
-
+ for cb, name in self._state._task_groups_changed_cbs:
+ # If name matches group, or if name not given call the cb
+ if name == self.name or name is None:
+ cb(name, self.complete_name, 'failed_tasks', full_name)
# State
#
# The state data that is stored for the purpose of sharing with the frontend.
@@ -226,6 +228,37 @@ class State():
def unregister_task_failed_callback(self, callback):
self._task_failed_cbs.remove(callback)
+ # register_task_groups_changed_callback()
+ #
+ # Registers a callback to be notified when a task group has changed
+ #
+ # Args:
+ # callback (function): The callback to be notified
+ # name (str): Optional taskgroup related name, e.g. the action_name of a Queue. If None
+ # given then the callback will be triggered for any task group changing.
+ #
+ # Callback Args:
+ # name (str): The name of the task group, e.g. 'build'
+ # complete_name (str): The complete name of the task group, e.g. 'built'
+ # task(str): The full name of the task outcome, processed, skipped or failed.
+ # element_name (str): Optional if an element task failed, the element name
+ #
+ def register_task_groups_changed_callback(self, callback, name=None):
+ self._task_groups_changed_cbs.append((callback, name))
+
+ # unregister_task_groups_changed_callback()
+ #
+ # Unregisters a callback previously registered by
+ # register_task_groups_changed_callback()
+ #
+ # Args:
+ # callback (function): The callback to be removed
+ # name (str): Optional taskgroup related name, e.g. the action_name of a Queue
+ #
+ def unregister_task_groups_changed_callback(self, callback, name=None):
+ self._task_groups_changed_cbs.remove((callback, name))
+
+
##############################################
# Core-facing APIs for driving notifications #
##############################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 866cbd7..2e6e650 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1753,7 +1753,17 @@ class Stream():
def _notification_handler(self, notification):
if notification.notification_type == NotificationType.TASK_GROUPS:
- self._state.task_groups = notification.task_groups
+ queue_name, complete_name, task_event, element_name = notification.task_groups
+ try:
+ group = self._state.task_groups[queue_name]
+ except KeyError:
+ # Queue not yet mirrored in front process, so create it & add it to status output
+ group = self._state.add_task_group(queue_name, complete_name)
+ if element_name is None:
+ count = getattr(group, task_event)
+ setattr(group, task_event, count + 1)
+ else:
+ getattr(group, task_event).append(element_name)
elif notification.notification_type == NotificationType.MESSAGE:
self._context.messenger.message(notification.message)
elif notification.notification_type == NotificationType.INTERRUPT: