You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:20:42 UTC
[buildstream] 05/19: Add notifications for session_start &
task_groups
This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit cf42285881dc7e75ff5278fc9ccc71e93c97dd80
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Sep 26 11:50:15 2019 +0100
Add notifications for session_start & task_groups
---
src/buildstream/_scheduler/scheduler.py | 15 +++++++++++++--
src/buildstream/_stream.py | 11 +++++++++--
2 files changed, 22 insertions(+), 4 deletions(-)
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 8a2dbf6..07263df 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -69,6 +69,8 @@ class NotificationType(FastEnum):
MESSAGE = "message"
TASK_ERROR = "task_error"
EXCEPTION = "exception"
+ START = "start"
+ TASK_GROUPS = "task_groups"
# Notification()
@@ -91,8 +93,8 @@ class Notification:
element=None,
message=None,
task_error=None,
- for_scheduler=None,
- exception=None
+ exception=None,
+ task_groups=None
):
self.notification_type = notification_type
self.full_name = full_name
@@ -103,6 +105,7 @@ class Notification:
self.message = message
self.task_error = task_error # Tuple of domain & reason
self.exception = exception
+ self.task_groups = task_groups
# Scheduler()
@@ -228,6 +231,14 @@ class Scheduler:
else:
status = SchedStatus.SUCCESS
+ # Send the state taskgroups if we're running under the subprocess
+ if self._notify_front:
+ # Don't pickle state
+ for group in self._state.task_groups.values():
+ group._state = None
+ notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
+ self._notify_front.put(notification)
+
return status
# clear_queues()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 1b4ba0e..7bb2f5d 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1448,7 +1448,10 @@ class Stream:
self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
if self._session_start_callback is not None:
- self._session_start_callback()
+ if self._notify_front:
+ self._notify_front.put(Notification(NotificationType.START))
+ else:
+ self._session_start_callback()
status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
@@ -1739,7 +1742,9 @@ class Stream:
return element_targets, artifact_refs
def _scheduler_notification_handler(self, notification):
- if notification.notification_type == NotificationType.MESSAGE:
+ if notification.notification_type == NotificationType.TASK_GROUPS:
+ self._state.task_groups = notification.task_groups
+ elif notification.notification_type == NotificationType.MESSAGE:
self._context.messenger.message(notification.message)
elif notification.notification_type == NotificationType.INTERRUPT:
self._interrupt_callback()
@@ -1764,6 +1769,8 @@ class Stream:
set_last_task_error(*notification.task_error)
elif notification.notification_type == NotificationType.EXCEPTION:
raise notification.exception.re_raise()
+ elif notification.notification_type == NotificationType.START:
+ self._session_start_callback()
else:
raise StreamError("Unrecognised notification type received")