You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 07:35:43 UTC

[buildstream] 04/17: Add notifications for session_start & task_groups

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tpollard/buildsubprocess
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit b736765725a12ca61f759505269be7262a78c280
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Sep 26 11:50:15 2019 +0100

    Add notifications for session_start & task_groups
---
 src/buildstream/_scheduler/scheduler.py | 15 +++++++++++++--
 src/buildstream/_stream.py              | 11 +++++++++--
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index df9819b..d703cd6 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -70,6 +70,8 @@ class NotificationType(FastEnum):
     MESSAGE = "message"
     TASK_ERROR = "task_error"
     EXCEPTION = "exception"
+    START = "start"
+    TASK_GROUPS = "task_groups"
 
 
 # Notification()
@@ -92,8 +94,8 @@ class Notification:
         element=None,
         message=None,
         task_error=None,
-        for_scheduler=None,
-        exception=None
+        exception=None,
+        task_groups=None
     ):
         self.notification_type = notification_type
         self.full_name = full_name
@@ -104,6 +106,7 @@ class Notification:
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
+        self.task_groups = task_groups
 
 
 # Scheduler()
@@ -236,6 +239,14 @@ class Scheduler:
         else:
             status = SchedStatus.SUCCESS
 
+        # Send the state taskgroups if we're running under the subprocess
+        if self._notify_front:
+            # Don't pickle state
+            for group in self._state.task_groups.values():
+                group._state = None
+            notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
+            self._notify_front.put(notification)
+
         return status
 
     # clear_queues()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c02ba6a..a6edf73 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1425,7 +1425,10 @@ class Stream:
         self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
 
         if self._session_start_callback is not None:
-            self._session_start_callback()
+            if self._notify_front:
+                self._notify_front.put(Notification(NotificationType.START))
+            else:
+                self._session_start_callback()
 
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
 
@@ -1696,7 +1699,9 @@ class Stream:
         return element_targets, artifact_refs
 
     def _scheduler_notification_handler(self, notification):
-        if notification.notification_type == NotificationType.MESSAGE:
+        if notification.notification_type == NotificationType.TASK_GROUPS:
+            self._state.task_groups = notification.task_groups
+        elif notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT:
             self._interrupt_callback()
@@ -1721,6 +1726,8 @@ class Stream:
             set_last_task_error(*notification.task_error)
         elif notification.notification_type == NotificationType.EXCEPTION:
             raise notification.exception.re_raise()
+        elif notification.notification_type == NotificationType.START:
+            self._session_start_callback()
         else:
             raise StreamError("Unrecognised notification type received")