You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:20:39 UTC
[buildstream] 02/19: scheduler.py: Notification for last_task_error
propagation
This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit f45ac38490af9b79420fca82bd2e3160be56e0cc
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Sep 10 15:10:04 2019 +0100
scheduler.py: Notification for last_task_error propagation
Add a notification for TASK_ERROR. As queues & job handlers will
be running in a different process to the front end, the global
state in the frontend Exception process needs to be notified.
This is used internally for the BST_TEST_SUITE.
---
src/buildstream/_scheduler/jobs/job.py | 4 ++--
src/buildstream/_scheduler/queues/queue.py | 4 ++--
src/buildstream/_scheduler/scheduler.py | 19 ++++++++++++++++++-
src/buildstream/_stream.py | 4 +++-
4 files changed, 25 insertions(+), 6 deletions(-)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index e7866bc..4cb80b8 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -29,7 +29,7 @@ import sys
import traceback
# BuildStream toplevel imports
-from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
+from ..._exceptions import ImplError, BstError, SkipJob
from ..._message import Message, MessageType, unconditional_messages
from ...types import FastEnum
from ... import _signals, utils
@@ -491,7 +491,7 @@ class Job:
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
- set_last_task_error(envelope.message["domain"], envelope.message["reason"])
+ self._scheduler.set_last_task_error(envelope.message["domain"], envelope.message["reason"])
elif envelope.message_type is _MessageType.RESULT:
assert self._result is None
self._result = envelope.message
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 986ac6c..79cb162 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -30,7 +30,7 @@ from ..jobs import ElementJob, JobStatus
from ..resources import ResourceType
# BuildStream toplevel imports
-from ..._exceptions import BstError, ImplError, set_last_task_error
+from ..._exceptions import BstError, ImplError
from ..._message import Message, MessageType
from ...types import FastEnum
@@ -326,7 +326,7 @@ class Queue:
#
# This just allows us stronger testing capability
#
- set_last_task_error(e.domain, e.reason)
+ self._scheduler.set_last_task_error(e.domain, e.reason)
except Exception: # pylint: disable=broad-except
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 171281b..fb666df 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -66,6 +66,7 @@ class NotificationType(FastEnum):
SUSPENDED = "suspended"
RETRY = "retry"
MESSAGE = "message"
+ TASK_ERROR = "task_error"
# Notification()
@@ -86,7 +87,8 @@ class Notification:
job_status=None,
time=None,
element=None,
- message=None
+ message=None,
+ task_error=None
):
self.notification_type = notification_type
self.full_name = full_name
@@ -95,6 +97,7 @@ class Notification:
self.time = time
self.element = element
self.message = message
+ self.task_error = task_error # Tuple of domain & reason
# Scheduler()
@@ -328,6 +331,20 @@ class Scheduler:
def notify_messenger(self, message):
self._notify(Notification(NotificationType.MESSAGE, message=message))
+ # set_last_task_error()
+ #
+ # Save the last error domain / reason reported from a child job or queue
+ # in the main process.
+ #
+ # Args:
+ # domain (ErrorDomain): Enum for the domain from which the error occurred
+ # reason (str): String identifier representing the reason for the error
+ #
+ def set_last_task_error(self, domain, reason):
+ task_error = domain, reason
+ notification = Notification(NotificationType.TASK_ERROR, task_error=task_error)
+ self._notify(notification)
+
#######################################################
# Local Private Methods #
#######################################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e0a8d92..d907e93 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -31,7 +31,7 @@ from fnmatch import fnmatch
from collections import deque
from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
from ._message import Message, MessageType
from ._scheduler import (
Scheduler,
@@ -1692,6 +1692,8 @@ class Stream:
self._scheduler_terminated = True
elif notification.notification_type == NotificationType.SUSPENDED:
self._scheduler_suspended = not self._scheduler_suspended
+ elif notification.notification_type == NotificationType.TASK_ERROR:
+ set_last_task_error(*notification.task_error)
else:
raise StreamError("Unrecognised notification type received")