You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:20:42 UTC

[buildstream] 05/19: Add notifications for session_start & task_groups

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit cf42285881dc7e75ff5278fc9ccc71e93c97dd80
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Sep 26 11:50:15 2019 +0100

    Add notifications for session_start & task_groups
---
 src/buildstream/_scheduler/scheduler.py | 15 +++++++++++++--
 src/buildstream/_stream.py              | 11 +++++++++--
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 8a2dbf6..07263df 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -69,6 +69,8 @@ class NotificationType(FastEnum):
     MESSAGE = "message"
     TASK_ERROR = "task_error"
     EXCEPTION = "exception"
+    START = "start"
+    TASK_GROUPS = "task_groups"
 
 
 # Notification()
@@ -91,8 +93,8 @@ class Notification:
         element=None,
         message=None,
         task_error=None,
-        for_scheduler=None,
-        exception=None
+        exception=None,
+        task_groups=None
     ):
         self.notification_type = notification_type
         self.full_name = full_name
@@ -103,6 +105,7 @@ class Notification:
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
+        self.task_groups = task_groups
 
 
 # Scheduler()
@@ -228,6 +231,14 @@ class Scheduler:
         else:
             status = SchedStatus.SUCCESS
 
+        # Send the state taskgroups if we're running under the subprocess
+        if self._notify_front:
+            # Don't pickle state
+            for group in self._state.task_groups.values():
+                group._state = None
+            notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
+            self._notify_front.put(notification)
+
         return status
 
     # clear_queues()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 1b4ba0e..7bb2f5d 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1448,7 +1448,10 @@ class Stream:
         self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
 
         if self._session_start_callback is not None:
-            self._session_start_callback()
+            if self._notify_front:
+                self._notify_front.put(Notification(NotificationType.START))
+            else:
+                self._session_start_callback()
 
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
 
@@ -1739,7 +1742,9 @@ class Stream:
         return element_targets, artifact_refs
 
     def _scheduler_notification_handler(self, notification):
-        if notification.notification_type == NotificationType.MESSAGE:
+        if notification.notification_type == NotificationType.TASK_GROUPS:
+            self._state.task_groups = notification.task_groups
+        elif notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT:
             self._interrupt_callback()
@@ -1764,6 +1769,8 @@ class Stream:
             set_last_task_error(*notification.task_error)
         elif notification.notification_type == NotificationType.EXCEPTION:
             raise notification.exception.re_raise()
+        elif notification.notification_type == NotificationType.START:
+            self._session_start_callback()
         else:
             raise StreamError("Unrecognised notification type received")