You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 07:35:43 UTC
[buildstream] 04/17: Add notifications for session_start &
task_groups
This is an automated email from the ASF dual-hosted git repository.
tvb pushed a commit to branch tpollard/buildsubprocess
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit b736765725a12ca61f759505269be7262a78c280
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Sep 26 11:50:15 2019 +0100
Add notifications for session_start & task_groups
---
src/buildstream/_scheduler/scheduler.py | 15 +++++++++++++--
src/buildstream/_stream.py | 11 +++++++++--
2 files changed, 22 insertions(+), 4 deletions(-)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index df9819b..d703cd6 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -70,6 +70,8 @@ class NotificationType(FastEnum):
MESSAGE = "message"
TASK_ERROR = "task_error"
EXCEPTION = "exception"
+ START = "start"
+ TASK_GROUPS = "task_groups"
# Notification()
@@ -92,8 +94,8 @@ class Notification:
element=None,
message=None,
task_error=None,
- for_scheduler=None,
- exception=None
+ exception=None,
+ task_groups=None
):
self.notification_type = notification_type
self.full_name = full_name
@@ -104,6 +106,7 @@ class Notification:
self.message = message
self.task_error = task_error # Tuple of domain & reason
self.exception = exception
+ self.task_groups = task_groups
# Scheduler()
@@ -236,6 +239,14 @@ class Scheduler:
else:
status = SchedStatus.SUCCESS
+ # Send the state taskgroups if we're running under the subprocess
+ if self._notify_front:
+ # Don't pickle state
+ for group in self._state.task_groups.values():
+ group._state = None
+ notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
+ self._notify_front.put(notification)
+
return status
# clear_queues()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c02ba6a..a6edf73 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1425,7 +1425,10 @@ class Stream:
self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
if self._session_start_callback is not None:
- self._session_start_callback()
+ if self._notify_front:
+ self._notify_front.put(Notification(NotificationType.START))
+ else:
+ self._session_start_callback()
status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process_manager())
@@ -1696,7 +1699,9 @@ class Stream:
return element_targets, artifact_refs
def _scheduler_notification_handler(self, notification):
- if notification.notification_type == NotificationType.MESSAGE:
+ if notification.notification_type == NotificationType.TASK_GROUPS:
+ self._state.task_groups = notification.task_groups
+ elif notification.notification_type == NotificationType.MESSAGE:
self._context.messenger.message(notification.message)
elif notification.notification_type == NotificationType.INTERRUPT:
self._interrupt_callback()
@@ -1721,6 +1726,8 @@ class Stream:
set_last_task_error(*notification.task_error)
elif notification.notification_type == NotificationType.EXCEPTION:
raise notification.exception.re_raise()
+ elif notification.notification_type == NotificationType.START:
+ self._session_start_callback()
else:
raise StreamError("Unrecognised notification type received")