You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:44:27 UTC

[buildstream] 01/17: scheduler.py: Notification for last_task_error propagation

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubprocess
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 9ffe0a0003d4dad79d898f85d41130d6e29248a5
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Sep 10 15:10:04 2019 +0100

    scheduler.py: Notification for last_task_error propagation
    
    Add a notification for TASK_ERROR. As queues & job handlers will
    be running in a different process to the front end, the global
    state in the frontend Exception process needs to be notified.
    This is used internally for the BST_TEST_SUITE.
---
 src/buildstream/_scheduler/jobs/job.py     |  4 ++--
 src/buildstream/_scheduler/queues/queue.py |  4 ++--
 src/buildstream/_scheduler/scheduler.py    | 19 ++++++++++++++++++-
 src/buildstream/_stream.py                 |  4 +++-
 4 files changed, 25 insertions(+), 6 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 61eff88..460f63d 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -30,7 +30,7 @@ import sys
 import traceback
 
 # BuildStream toplevel imports
-from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
+from ..._exceptions import ImplError, BstError, SkipJob
 from ..._message import Message, MessageType, unconditional_messages
 from ...types import FastEnum
 from ... import _signals, utils
@@ -475,7 +475,7 @@ class Job:
             # For regression tests only, save the last error domain / reason
             # reported from a child task in the main process, this global state
             # is currently managed in _exceptions.py
-            set_last_task_error(envelope.message["domain"], envelope.message["reason"])
+            self._scheduler.set_last_task_error(envelope.message["domain"], envelope.message["reason"])
         elif envelope.message_type is _MessageType.RESULT:
             assert self._result is None
             self._result = envelope.message
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 295161e..71a34a8 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -30,7 +30,7 @@ from ..jobs import ElementJob, JobStatus
 from ..resources import ResourceType
 
 # BuildStream toplevel imports
-from ..._exceptions import BstError, ImplError, set_last_task_error
+from ..._exceptions import BstError, ImplError
 from ..._message import Message, MessageType
 from ...types import FastEnum
 
@@ -316,7 +316,7 @@ class Queue:
             #
             # This just allows us stronger testing capability
             #
-            set_last_task_error(e.domain, e.reason)
+            self._scheduler.set_last_task_error(e.domain, e.reason)
 
         except Exception:  # pylint: disable=broad-except
 
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index a45da82..0700186 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -67,6 +67,7 @@ class NotificationType(FastEnum):
     SUSPENDED = "suspended"
     RETRY = "retry"
     MESSAGE = "message"
+    TASK_ERROR = "task_error"
 
 
 # Notification()
@@ -87,7 +88,8 @@ class Notification:
         job_status=None,
         time=None,
         element=None,
-        message=None
+        message=None,
+        task_error=None
     ):
         self.notification_type = notification_type
         self.full_name = full_name
@@ -96,6 +98,7 @@ class Notification:
         self.time = time
         self.element = element
         self.message = message
+        self.task_error = task_error  # Tuple of domain & reason
 
 
 # Scheduler()
@@ -335,6 +338,20 @@ class Scheduler:
     def notify_messenger(self, message):
         self._notify(Notification(NotificationType.MESSAGE, message=message))
 
+    # set_last_task_error()
+    #
+    # Save the last error domain / reason reported from a child job or queue
+    # in the main process.
+    #
+    # Args:
+    #    domain (ErrorDomain): Enum for the domain from which the error occurred
+    #    reason (str): String identifier representing the reason for the error
+    #
+    def set_last_task_error(self, domain, reason):
+        task_error = domain, reason
+        notification = Notification(NotificationType.TASK_ERROR, task_error=task_error)
+        self._notify(notification)
+
     #######################################################
     #                  Local Private Methods              #
     #######################################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c2945a2..ab270b4 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -32,7 +32,7 @@ from collections import deque
 from typing import List, Tuple
 
 from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
 from ._message import Message, MessageType
 from ._scheduler import (
     Scheduler,
@@ -1649,6 +1649,8 @@ class Stream:
             self._scheduler_terminated = True
         elif notification.notification_type == NotificationType.SUSPENDED:
             self._scheduler_suspended = not self._scheduler_suspended
+        elif notification.notification_type == NotificationType.TASK_ERROR:
+            set_last_task_error(*notification.task_error)
         else:
             raise StreamError("Unrecognised notification type received")