You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:45:10 UTC

[buildstream] 12/16: Add support for dynamic queue status reporting to frontend State()

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 72c0d08cfa18f6cfff250626a62f62a1745bfe85
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Oct 24 14:23:55 2019 +0100

    Add support for dynamic queue status reporting to frontend State()
---
 src/buildstream/_scheduler/queues/queue.py |  2 ++
 src/buildstream/_scheduler/scheduler.py    | 15 ++++-----
 src/buildstream/_state.py                  | 53 ++++++++++++++++++++++++------
 src/buildstream/_stream.py                 | 12 ++++++-
 4 files changed, 62 insertions(+), 20 deletions(-)

diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 664741b..e31259b 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -86,6 +86,8 @@ class Queue():
             self._max_retries = scheduler.context.sched_network_retries
 
         self._task_group = self._scheduler._state.add_task_group(self.action_name, self.complete_name)
+        self._scheduler._state.register_task_groups_changed_callback(self._scheduler._update_task_groups,
+                                                                     name=self.action_name)
 
     # destroy()
     #
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 74a688f..2b9f9e6 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -105,7 +105,7 @@ class Notification():
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
-        self.task_groups = task_groups
+        self.task_groups = task_groups # Tuple of queue name, complete name, task change, & optional element name
         self.element_totals = element_totals
 
 
@@ -242,14 +242,6 @@ class Scheduler():
         else:
             status = SchedStatus.SUCCESS
 
-        # Send the state taskgroups if we're running under the subprocess
-        if subprocessed:
-            # Don't pickle state
-            for group in self._state.task_groups.values():
-                group._state = None
-            notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
-            self._notify_front_queue.put(notification)
-
         return status
 
     # clear_queues()
@@ -650,6 +642,11 @@ class Scheduler():
         if self._notify_back_queue:
             self.loop.remove_reader(self._notify_back_queue._reader.fileno())
 
+    def _update_task_groups(self, name, complete_name, task, full_name=None):
+        if self._notify_front_queue:
+            changes = (name, complete_name, task, full_name)
+            self._notify_front(Notification(NotificationType.TASK_GROUPS, task_groups=changes))
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index 310e12a..f35f07a 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -54,8 +54,10 @@ class TaskGroup():
     #
     def add_processed_task(self):
         self.processed_tasks += 1
-        for cb in self._state._task_groups_changed_cbs:
-            cb()
+        for cb, name in self._state._task_groups_changed_cbs:
+            # If name matches group, or if name not given call the cb
+            if name == self.name or name is None:
+                cb(name, self.complete_name, 'processed_tasks')
 
     # add_skipped_task()
     #
@@ -65,9 +67,10 @@ class TaskGroup():
     #
     def add_skipped_task(self):
         self.skipped_tasks += 1
-
-        for cb in self._state._task_groups_changed_cbs:
-            cb()
+        for cb, name in self._state._task_groups_changed_cbs:
+            # If name matches group, or if name not given call the cb
+            if name == self.name or name is None:
+                cb(name, self.complete_name, 'skipped_tasks')
 
     # add_failed_task()
     #
@@ -82,11 +85,10 @@ class TaskGroup():
     #
     def add_failed_task(self, full_name):
         self.failed_tasks.append(full_name)
-
-        for cb in self._state._task_groups_changed_cbs:
-            cb()
-
-
+        for cb, name in self._state._task_groups_changed_cbs:
+            # If name matches group, or if name not given call the cb
+            if name == self.name or name is None:
+                cb(name, self.complete_name, 'failed_tasks', full_name)
 # State
 #
 # The state data that is stored for the purpose of sharing with the frontend.
@@ -226,6 +228,37 @@ class State():
     def unregister_task_failed_callback(self, callback):
         self._task_failed_cbs.remove(callback)
 
+    # register_task_groups_changed_callback()
+    #
+    # Registers a callback to be notified when a task group has changed
+    #
+    # Args:
+    #    callback (function): The callback to be notified
+    #    name (str): Optional taskgroup related name, e.g. the action_name of a Queue. If None
+    #                given then the callback will be triggered for any task group changing.
+    #
+    # Callback Args:
+    #    name (str): The name of the task group, e.g. 'build'
+    #    complete_name (str): The complete name of the task group, e.g. 'built'
+    #    task(str): The full name of the task outcome, processed, skipped or failed.
+    #    element_name (str): Optional if an element task failed, the element name
+    #
+    def register_task_groups_changed_callback(self, callback, name=None):
+        self._task_groups_changed_cbs.append((callback, name))
+
+    # unregister_task_groups_changed_callback()
+    #
+    # Unregisters a callback previously registered by
+    # register_task_groups_changed_callback()
+    #
+    # Args:
+    #    callback (function): The callback to be removed
+    #    name (str): Optional taskgroup related name, e.g. the action_name of a Queue
+    #
+    def unregister_task_groups_changed_callback(self, callback, name=None):
+        self._task_groups_changed_cbs.remove((callback, name))
+
+
     ##############################################
     # Core-facing APIs for driving notifications #
     ##############################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 866cbd7..2e6e650 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1753,7 +1753,17 @@ class Stream():
 
     def _notification_handler(self, notification):
         if notification.notification_type == NotificationType.TASK_GROUPS:
-            self._state.task_groups = notification.task_groups
+            queue_name, complete_name, task_event, element_name = notification.task_groups
+            try:
+                group = self._state.task_groups[queue_name]
+            except KeyError:
+                # Queue not yet mirrored in front process, so create it & add it to status output
+                group = self._state.add_task_group(queue_name, complete_name)
+            if element_name is None:
+                count = getattr(group, task_event)
+                setattr(group, task_event, count + 1)
+            else:
+                getattr(group, task_event).append(element_name)
         elif notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT: