You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 07:38:17 UTC

[buildstream] 11/18: Add support for dynamic queue status reporting to frontend State()

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tpollard/subrebase
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 9516bc3da7d1b0d0fb3b1d4b9e147b46647fa8f8
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Oct 24 14:23:55 2019 +0100

    Add support for dynamic queue status reporting to frontend State()
---
 src/buildstream/_scheduler/queues/queue.py |  3 ++
 src/buildstream/_scheduler/scheduler.py    | 15 ++++-----
 src/buildstream/_state.py                  | 50 +++++++++++++++++++++++++-----
 src/buildstream/_stream.py                 | 12 ++++++-
 4 files changed, 62 insertions(+), 18 deletions(-)

diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 71a34a8..d9bae88 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -86,6 +86,9 @@ class Queue:
             self._max_retries = scheduler.context.sched_network_retries
 
         self._task_group = self._scheduler._state.add_task_group(self.action_name, self.complete_name)
+        self._scheduler._state.register_task_groups_changed_callback(
+            self._scheduler._update_task_groups, name=self.action_name
+        )
 
     # destroy()
     #
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index ab85c35..eb42c4f 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -109,7 +109,7 @@ class Notification:
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
-        self.task_groups = task_groups
+        self.task_groups = task_groups  # Tuple of queue name, complete name, task change, & optional element name
         self.element_totals = element_totals
 
 
@@ -243,14 +243,6 @@ class Scheduler:
         else:
             status = SchedStatus.SUCCESS
 
-        # Send the state taskgroups if we're running under the subprocess
-        if subprocessed:
-            # Don't pickle state
-            for group in self._state.task_groups.values():
-                group._state = None
-            notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
-            self._notify_front_queue.put(notification)
-
         return status
 
     # clear_queues()
@@ -658,6 +650,11 @@ class Scheduler:
         if self._notify_back_queue:
             self.loop.remove_reader(self._notify_back_queue._reader.fileno())
 
+    def _update_task_groups(self, name, complete_name, task, full_name=None):
+        if self._notify_front_queue:
+            changes = (name, complete_name, task, full_name)
+            self._notify_front(Notification(NotificationType.TASK_GROUPS, task_groups=changes))
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index d85e348..c0c29e9 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -54,8 +54,10 @@ class TaskGroup:
     #
     def add_processed_task(self):
         self.processed_tasks += 1
-        for cb in self._state._task_groups_changed_cbs:
-            cb()
+        for cb, name in self._state._task_groups_changed_cbs:
+            # If name matches group, or if name not given call the cb
+            if name == self.name or name is None:
+                cb(name, self.complete_name, "processed_tasks")
 
     # add_skipped_task()
     #
@@ -65,9 +67,10 @@ class TaskGroup:
     #
     def add_skipped_task(self):
         self.skipped_tasks += 1
-
-        for cb in self._state._task_groups_changed_cbs:
-            cb()
+        for cb, name in self._state._task_groups_changed_cbs:
+            # If name matches group, or if name not given call the cb
+            if name == self.name or name is None:
+                cb(name, self.complete_name, "skipped_tasks")
 
     # add_failed_task()
     #
@@ -82,9 +85,10 @@ class TaskGroup:
     #
     def add_failed_task(self, full_name):
         self.failed_tasks.append(full_name)
-
-        for cb in self._state._task_groups_changed_cbs:
-            cb()
+        for cb, name in self._state._task_groups_changed_cbs:
+            # If name matches group, or if name not given call the cb
+            if name == self.name or name is None:
+                cb(name, self.complete_name, "failed_tasks", full_name)
 
 
 # State
@@ -226,6 +230,36 @@ class State:
     def unregister_task_failed_callback(self, callback):
         self._task_failed_cbs.remove(callback)
 
+    # register_task_groups_changed_callback()
+    #
+    # Registers a callback to be notified when a task group has changed
+    #
+    # Args:
+    #    callback (function): The callback to be notified
+    #    name (str): Optional taskgroup related name, e.g. the action_name of a Queue. If None
+    #                given then the callback will be triggered for any task group changing.
+    #
+    # Callback Args:
+    #    name (str): The name of the task group, e.g. 'build'
+    #    complete_name (str): The complete name of the task group, e.g. 'built'
+    #    task(str): The full name of the task outcome, processed, skipped or failed.
+    #    element_name (str): Optional if an element task failed, the element name
+    #
+    def register_task_groups_changed_callback(self, callback, name=None):
+        self._task_groups_changed_cbs.append((callback, name))
+
+    # unregister_task_groups_changed_callback()
+    #
+    # Unregisters a callback previously registered by
+    # register_task_groups_changed_callback()
+    #
+    # Args:
+    #    callback (function): The callback to be removed
+    #    name (str): Optional taskgroup related name, e.g. the action_name of a Queue
+    #
+    def unregister_task_groups_changed_callback(self, callback, name=None):
+        self._task_groups_changed_cbs.remove((callback, name))
+
     ##############################################
     # Core-facing APIs for driving notifications #
     ##############################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 438e733..7f2cfe9 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1735,7 +1735,17 @@ class Stream:
 
     def _notification_handler(self, notification):
         if notification.notification_type == NotificationType.TASK_GROUPS:
-            self._state.task_groups = notification.task_groups
+            queue_name, complete_name, task_event, element_name = notification.task_groups
+            try:
+                group = self._state.task_groups[queue_name]
+            except KeyError:
+                # Queue not yet mirrored in front process, so create it & add it to status output
+                group = self._state.add_task_group(queue_name, complete_name)
+            if element_name is None:
+                count = getattr(group, task_event)
+                setattr(group, task_event, count + 1)
+            else:
+                getattr(group, task_event).append(element_name)
         elif notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT: