You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:20:49 UTC
[buildstream] 12/19: Add support for dynamic queue status reporting
to frontend State()
This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 75e9d023ee424f4fb24e83908e199db891c54906
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Oct 24 14:23:55 2019 +0100
Add support for dynamic queue status reporting to frontend State()
---
src/buildstream/_scheduler/queues/queue.py | 3 ++
src/buildstream/_scheduler/scheduler.py | 15 ++++-----
src/buildstream/_state.py | 50 +++++++++++++++++++++++++-----
src/buildstream/_stream.py | 12 ++++++-
4 files changed, 62 insertions(+), 18 deletions(-)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 79cb162..77a9157 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -86,6 +86,9 @@ class Queue:
self._max_retries = scheduler.context.sched_network_retries
self._task_group = self._scheduler._state.add_task_group(self.action_name, self.complete_name)
+ self._scheduler._state.register_task_groups_changed_callback(
+ self._scheduler._update_task_groups, name=self.action_name
+ )
# destroy()
#
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index fbe2599..ac52b0e 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -109,7 +109,7 @@ class Notification:
self.message = message
self.task_error = task_error # Tuple of domain & reason
self.exception = exception
- self.task_groups = task_groups
+ self.task_groups = task_groups # Tuple of queue name, complete name, task change, & optional element name
self.element_totals = element_totals
@@ -243,14 +243,6 @@ class Scheduler:
else:
status = SchedStatus.SUCCESS
- # Send the state taskgroups if we're running under the subprocess
- if subprocessed:
- # Don't pickle state
- for group in self._state.task_groups.values():
- group._state = None
- notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
- self._notify_front_queue.put(notification)
-
return status
# clear_queues()
@@ -658,6 +650,11 @@ class Scheduler:
if self._notify_back_queue:
self.loop.remove_reader(self._notify_back_queue._reader.fileno())
+ def _update_task_groups(self, name, complete_name, task, full_name=None):
+ if self._notify_front_queue:
+ changes = (name, complete_name, task, full_name)
+ self._notify_front(Notification(NotificationType.TASK_GROUPS, task_groups=changes))
+
def __getstate__(self):
# The only use-cases for pickling in BuildStream at the time of writing
# are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index d85e348..c0c29e9 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -54,8 +54,10 @@ class TaskGroup:
#
def add_processed_task(self):
self.processed_tasks += 1
- for cb in self._state._task_groups_changed_cbs:
- cb()
+ for cb, name in self._state._task_groups_changed_cbs:
+ # If name matches group, or if name not given call the cb
+ if name == self.name or name is None:
+ cb(name, self.complete_name, "processed_tasks")
# add_skipped_task()
#
@@ -65,9 +67,10 @@ class TaskGroup:
#
def add_skipped_task(self):
self.skipped_tasks += 1
-
- for cb in self._state._task_groups_changed_cbs:
- cb()
+ for cb, name in self._state._task_groups_changed_cbs:
+ # If name matches group, or if name not given call the cb
+ if name == self.name or name is None:
+ cb(name, self.complete_name, "skipped_tasks")
# add_failed_task()
#
@@ -82,9 +85,10 @@ class TaskGroup:
#
def add_failed_task(self, full_name):
self.failed_tasks.append(full_name)
-
- for cb in self._state._task_groups_changed_cbs:
- cb()
+ for cb, name in self._state._task_groups_changed_cbs:
+ # If name matches group, or if name not given call the cb
+ if name == self.name or name is None:
+ cb(name, self.complete_name, "failed_tasks", full_name)
# State
@@ -226,6 +230,36 @@ class State:
def unregister_task_failed_callback(self, callback):
self._task_failed_cbs.remove(callback)
+ # register_task_groups_changed_callback()
+ #
+ # Registers a callback to be notified when a task group has changed
+ #
+ # Args:
+ # callback (function): The callback to be notified
+ # name (str): Optional taskgroup related name, e.g. the action_name of a Queue. If None
+ # given then the callback will be triggered for any task group changing.
+ #
+ # Callback Args:
+ # name (str): The name of the task group, e.g. 'build'
+ # complete_name (str): The complete name of the task group, e.g. 'built'
+ # task(str): The full name of the task outcome, processed, skipped or failed.
+ # element_name (str): Optional if an element task failed, the element name
+ #
+ def register_task_groups_changed_callback(self, callback, name=None):
+ self._task_groups_changed_cbs.append((callback, name))
+
+ # unregister_task_groups_changed_callback()
+ #
+ # Unregisters a callback previously registered by
+ # register_task_groups_changed_callback()
+ #
+ # Args:
+ # callback (function): The callback to be removed
+ # name (str): Optional taskgroup related name, e.g. the action_name of a Queue
+ #
+ def unregister_task_groups_changed_callback(self, callback, name=None):
+ self._task_groups_changed_cbs.remove((callback, name))
+
##############################################
# Core-facing APIs for driving notifications #
##############################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 4ac73e6..789e14a 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1778,7 +1778,17 @@ class Stream:
def _notification_handler(self, notification):
if notification.notification_type == NotificationType.TASK_GROUPS:
- self._state.task_groups = notification.task_groups
+ queue_name, complete_name, task_event, element_name = notification.task_groups
+ try:
+ group = self._state.task_groups[queue_name]
+ except KeyError:
+ # Queue not yet mirrored in front process, so create it & add it to status output
+ group = self._state.add_task_group(queue_name, complete_name)
+ if element_name is None:
+ count = getattr(group, task_event)
+ setattr(group, task_event, count + 1)
+ else:
+ getattr(group, task_event).append(element_name)
elif notification.notification_type == NotificationType.MESSAGE:
self._context.messenger.message(notification.message)
elif notification.notification_type == NotificationType.INTERRUPT: