You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:45:01 UTC

[buildstream] 03/16: Stop pickling exceptions, regen once off queue

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 02a8fb4778deec93b33d2766f0d4f01637321977
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Sep 25 11:36:01 2019 +0100

    Stop pickling exceptions, regen once off queue
---
 src/buildstream/_exceptions.py          | 12 ++++++++++++
 src/buildstream/_scheduler/scheduler.py |  2 --
 src/buildstream/_stream.py              | 19 ++++++++-----------
 3 files changed, 20 insertions(+), 13 deletions(-)

diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index 947b831..a6f726b 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -113,6 +113,8 @@ class BstError(Exception):
 
         super().__init__(message)
 
+        self.message = message
+
         # Additional error detail, these are used to construct detail
         # portions of the logging messages when encountered.
         #
@@ -378,3 +380,13 @@ class SkipJob(Exception):
 class ArtifactElementError(BstError):
     def __init__(self, message, *, detail=None, reason=None):
         super().__init__(message, detail=detail, domain=ErrorDomain.ELEMENT, reason=reason)
+
+class SubprocessException(BstError):
+    def __init__(self, **kwargs):
+        super().__init__(kwargs['message'], detail=kwargs['detail'],
+                         domain=kwargs['domain'], reason=kwargs['reason'], temporary=kwargs['temporary'])
+        self.sandbox = kwargs['sandbox']
+        try:
+            self.terminated = kwargs['terminated']
+        except KeyError:
+            pass
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 5a3da69..2d152b2 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -609,8 +609,6 @@ class Scheduler():
         elif notification.notification_type == NotificationType.RETRY:
             self._failure_retry(notification.job_action, notification.element)
         else:
-            # Do not raise exception once scheduler process is separated
-            # as we don't want to pickle exceptions between processes
             raise ValueError("Unrecognised notification type received")
 
     def _loop(self):
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 7cb3515..bc78f72 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -34,7 +34,7 @@ from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
 
 from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error, SubprocessException
 from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
     SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus
@@ -117,12 +117,12 @@ class Stream():
         utils._reset_main_pid()
         try:
             func(*args, **kwargs)
-        except Exception as e:
-            notify.put(Notification(NotificationType.EXCEPTION, exception=e))
+        except BstError as e:
+            # Send the exceptions members dict to be reraised in main process
+            exception_attrs = vars(e)
+            notify.put(Notification(NotificationType.EXCEPTION, exception=exception_attrs))
 
     def run_in_subprocess(self, func, *args, **kwargs):
-        print("Args: {}".format([*args]))
-        print("Kwargs: {}".format(list(kwargs.items())))
         assert not self._subprocess
 
         mp_context = mp.get_context(method='fork')
@@ -137,7 +137,6 @@ class Stream():
         args = list(args)
         args.insert(0, self._notify_front)
         args.insert(0, func)
-        print("launching subprocess:", process_name)
 
         self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
                                               kwargs=kwargs, name=process_name)
@@ -150,7 +149,6 @@ class Stream():
             self._subprocess.join(0.01)
             # if no exit code, go back to checking the message queue
             self._loop()
-        print("Stopping loop...")
 
         # Set main process back
         utils._reset_main_pid()
@@ -161,9 +159,9 @@ class Stream():
                 notification = self._notify_front.get_nowait()
                 self._scheduler_notification_handler(notification)
         except queue.Empty:
-            print("Finished processing notifications")
             pass
 
+
     # cleanup()
     #
     # Cleans up application state
@@ -1751,13 +1749,12 @@ class Stream():
         elif notification.notification_type == NotificationType.TASK_ERROR:
             set_last_task_error(*notification.task_error)
         elif notification.notification_type == NotificationType.EXCEPTION:
-            raise notification.exception
+            # Regenerate the exception here, so we don't have to pickle it
+            raise SubprocessException(**notification.exception)
         else:
             raise StreamError("Unrecognised notification type received")
 
     def _notify(self, notification):
-        # Set that the notifcation is for the scheduler
-        #notification.for_scheduler = True
         if self._notify_back:
             self._notify_back.put(notification)
         else: