You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:32:00 UTC
[buildstream] 03/10: Stop pickling exceptions, regen once off queue
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch tpollard/streamasync
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit d2d8f4856411ce2dc0c32110c75f8f7f88512beb
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Sep 25 11:36:01 2019 +0100
Stop pickling exceptions, regen once off queue
---
src/buildstream/_exceptions.py | 13 ++++++++++++-
src/buildstream/_stream.py | 19 ++++++++-----------
2 files changed, 20 insertions(+), 12 deletions(-)
diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index 947b831..fcf0c9e 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -113,6 +113,8 @@ class BstError(Exception):
super().__init__(message)
+ self.message = message
+
# Additional error detail, these are used to construct detail
# portions of the logging messages when encountered.
#
@@ -352,7 +354,6 @@ class StreamError(BstError):
self.terminated = terminated
-
# AppError
#
# Raised from the frontend App directly
@@ -378,3 +379,13 @@ class SkipJob(Exception):
class ArtifactElementError(BstError):
def __init__(self, message, *, detail=None, reason=None):
super().__init__(message, detail=detail, domain=ErrorDomain.ELEMENT, reason=reason)
+
+class SubprocessException(BstError):
+ def __init__(self, **kwargs):
+ super().__init__(kwargs['message'], detail=kwargs['detail'],
+ domain=kwargs['domain'], reason=kwargs['reason'], temporary=kwargs['temporary'])
+ self.sandbox = kwargs['sandbox']
+ try:
+ self.terminated = kwargs['terminated']
+ except KeyError:
+ pass
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 5f7eb52..9b2fd5c 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -34,7 +34,7 @@ from contextlib import contextmanager, suppress
from fnmatch import fnmatch
from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error, SubprocessException
from ._message import Message, MessageType
from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus
@@ -117,12 +117,12 @@ class Stream():
utils._reset_main_pid()
try:
func(*args, **kwargs)
- except Exception as e:
- notify.put(Notification(NotificationType.EXCEPTION, exception=e))
+ except BstError as e:
+ # Send the exceptions members dict to be reraised in main process
+ exception_attrs = vars(e)
+ notify.put(Notification(NotificationType.EXCEPTION, exception=exception_attrs))
def run_in_subprocess(self, func, *args, **kwargs):
- print("Args: {}".format([*args]))
- print("Kwargs: {}".format(list(kwargs.items())))
assert not self._subprocess
mp_context = mp.get_context(method='fork')
@@ -137,7 +137,6 @@ class Stream():
args = list(args)
args.insert(0, self._notify_front)
args.insert(0, func)
- print("launching subprocess:", process_name)
self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
kwargs=kwargs, name=process_name)
@@ -150,7 +149,6 @@ class Stream():
self._subprocess.join(0.01)
# if no exit code, go back to checking the message queue
self._loop()
- print("Stopping loop...")
# Set main process back
utils._reset_main_pid()
@@ -161,9 +159,9 @@ class Stream():
notification = self._notify_front.get_nowait()
self._scheduler_notification_handler(notification)
except queue.Empty:
- print("Finished processing notifications")
pass
+
# cleanup()
#
# Cleans up application state
@@ -1761,13 +1759,12 @@ class Stream():
elif notification.notification_type == NotificationType.TASK_ERROR:
set_last_task_error(*notification.task_error)
elif notification.notification_type == NotificationType.EXCEPTION:
- raise notification.exception
+ # Regenerate the exception here, so we don't have to pickle it
+ raise SubprocessException(**notification.exception)
else:
raise StreamError("Unrecognised notification type received")
def _notify(self, notification):
- # Set that the notifcation is for the scheduler
- #notification.for_scheduler = True
if self._notify_back:
self._notify_back.put(notification)
else: