You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:44:58 UTC

[buildstream] branch tpollard/buildsubtemp created (now aae4bbc)

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a change to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at aae4bbc  signal debug

This branch includes the following new commits:

     new 40ce52d  scheduler.py: Notification for last_task_error propagation
     new 889a6ea  Add in dual queue implementation for subprocess build
     new 02a8fb4  Stop pickling exceptions, regen once off queue
     new 3a5f5ae  Add notifications for session_start & task_groups
     new fdda15d  Explicitly ensure failed build sources are not pushed
     new 8c3f905  Add len of session/total elements members to Stream
     new 73889e1  Make it more verbose with front & back notifications
     new f0f2986  Move sched notification poll to loop reader
     new 7ace151  Failed shell to load via name if no plugintable state
     new 2fe0c6a  basic async in stream
     new 7870bc3  basic user SIG interrupt handling in Stream
     new 72c0d08  Add support for dynamic queue status reporting to frontend State()
     new 3f64449  Add support for logger print header displaying pipeline output
     new c357edd  Lint fixes
     new c753390  Fixup sched notification to frontend
     new aae4bbc  signal debug

The 16 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 13/16: Add support for logger print header displaying pipeline output

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 3f644493743d06b28d1c168f4a52518c052e0dbf
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Oct 24 17:04:29 2019 +0100

    Add support for logger print header displaying pipeline output
---
 src/buildstream/_frontend/app.py        | 3 +++
 src/buildstream/_frontend/widget.py     | 6 +++++-
 src/buildstream/_scheduler/scheduler.py | 5 ++++-
 src/buildstream/_stream.py              | 8 ++++++++
 4 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index c021602..e146de9 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -227,6 +227,9 @@ class App():
                                   self._detail_profile,
                                   indent=INDENT)
 
+            # Register the Logline pipeline renderer callback in Stream
+            self.stream._pipeline_render_callback = self.logger.show_pipeline
+
             # Propagate pipeline feedback to the user
             self.context.messenger.set_message_handler(self._message_handler)
 
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index d40b87c..9a02588 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -512,7 +512,11 @@ class LogLine(Widget):
 
         # Pipeline state
         text += self.content_profile.fmt("Pipeline\n", bold=True)
-        text += self.show_pipeline(stream.total_elements, context.log_element_format)
+        # Check if the output of show pipeline has already been generated for stream total elements
+        if stream.total_pipeline_render:
+            text += stream.total_pipeline_render
+        else:
+            text += self.show_pipeline(stream.total_elements, context.log_element_format)
         text += '\n'
 
         # Separator line before following output
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 2b9f9e6..b6539f6 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -71,6 +71,7 @@ class NotificationType(FastEnum):
     ELEMENT_TOTALS = "element_totals"
     FINISH = "finish"
     SIGTSTP = "sigstp"
+    SHOW_PIPELINE = "show_pipeline"
 
 
 # Notification()
@@ -95,7 +96,8 @@ class Notification():
                  task_error=None,
                  exception=None,
                  task_groups=None,
-                 element_totals=None):
+                 element_totals=None,
+                 show_pipeline = None):
         self.notification_type = notification_type
         self.full_name = full_name
         self.job_action = job_action
@@ -107,6 +109,7 @@ class Notification():
         self.exception = exception
         self.task_groups = task_groups # Tuple of queue name, complete name, task change, & optional element name
         self.element_totals = element_totals
+        self.show_pipeline = show_pipeline # Output of LogLine.show_pipeline() cb, to represent pipeline state
 
 
 # Scheduler()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 2e6e650..61a8322 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -76,6 +76,7 @@ class Stream():
         self.len_session_elements = ''
         self.len_total_elements = ''
         self.loop = None
+        self.total_pipeline_render = None
 
         #
         # Private members
@@ -105,6 +106,7 @@ class Stream():
         self._notify_back_queue = None
         self._casd_process = None
         self._watcher = None
+        self._pipeline_render_callback = None
 
     # init()
     #
@@ -1456,6 +1458,10 @@ class Stream():
                                         element_totals=element_totals))
 
 
+        # Also send through the pipeline renderer output for heading & summary rendering
+        total_pipeline_render = self._pipeline_render_callback(self.total_elements, self._context.log_element_format) # pylint: disable=not-callable
+        self._notify_front(Notification(NotificationType.SHOW_PIPELINE, show_pipeline=total_pipeline_render))
+
         if self._session_start_callback is not None:
             self._notify_front(Notification(NotificationType.START))
 
@@ -1794,6 +1800,8 @@ class Stream():
             self._session_start_callback()
         elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
             self.len_session_elements, self.len_total_elements = notification.element_totals
+        elif notification.notification_type == NotificationType.SHOW_PIPELINE:
+            self.total_pipeline_render = notification.show_pipeline
         elif notification.notification_type == NotificationType.FINISH:
             if self.loop:
                 self.loop.stop()


[buildstream] 09/16: Failed shell to load via name if no plugintable state

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 7ace1516110cfb8508c394d65d09a2b29c697303
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Oct 2 14:09:44 2019 +0100

    Failed shell to load via name if no plugintable state
---
 src/buildstream/_frontend/app.py |  2 +-
 src/buildstream/_stream.py       | 11 +++++++++--
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 45160af..4fd632d 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -643,7 +643,7 @@ class App():
                         unique_id, element_key = element
                         prompt = self.shell_prompt(full_name, element_key)
                         self.stream.shell(None, Scope.BUILD, prompt, isolate=True,
-                                          usebuildtree='always', unique_id=unique_id)
+                                          usebuildtree='always', unique_id=unique_id, full_name=full_name)
                     except BstError as e:
                         click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
                 elif choice == 'log':
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index cdfa06c..d034e77 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -230,6 +230,7 @@ class Stream():
     #    usebuildtree (str): Whether to use a buildtree as the source, given cli option
     #    pull_dependencies ([Element]|None): Elements to attempt to pull
     #    unique_id: (str): Whether to use a unique_id to load an Element instance
+    #    full_name: (str): The elements full name, used if unique_id lookup fails
     #
     # Returns:
     #    (int): The exit code of the launched shell
@@ -241,11 +242,17 @@ class Stream():
               command=None,
               usebuildtree=None,
               pull_dependencies=None,
-              unique_id=None):
+              unique_id=None,
+              full_name=None):
 
         # Load the Element via the unique_id if given
         if unique_id and element is None:
-            element = Plugin._lookup(unique_id)
+            try:
+                element = Plugin._lookup(unique_id)
+            except AssertionError:
+                # Could not be loaded from plugintable, load forcefully
+                element_list = self.load_selection([full_name], selection=PipelineSelection.NONE)
+                element = element_list[0]
 
         # Assert we have everything we need built, unless the directory is specified
         # in which case we just blindly trust the directory, using the element


[buildstream] 04/16: Add notifications for session_start & task_groups

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 3a5f5aec55797dd7f9c052ee48597b8c090dead2
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Sep 26 11:50:15 2019 +0100

    Add notifications for session_start & task_groups
---
 src/buildstream/_scheduler/scheduler.py | 15 +++++++++++++--
 src/buildstream/_stream.py              | 11 +++++++++--
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 2d152b2..6677c76 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -66,6 +66,8 @@ class NotificationType(FastEnum):
     MESSAGE = "message"
     TASK_ERROR = "task_error"
     EXCEPTION = "exception"
+    START = "start"
+    TASK_GROUPS = "task_groups"
 
 
 # Notification()
@@ -88,8 +90,8 @@ class Notification():
                  element=None,
                  message=None,
                  task_error=None,
-                 for_scheduler=False,
-                 exception=None):
+                 exception=None,
+                 task_groups=None):
         self.notification_type = notification_type
         self.full_name = full_name
         self.job_action = job_action
@@ -99,6 +101,7 @@ class Notification():
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
+        self.task_groups = task_groups
 
 
 # Scheduler()
@@ -228,6 +231,14 @@ class Scheduler():
         else:
             status = SchedStatus.SUCCESS
 
+        # Send the state taskgroups if we're running under the subprocess
+        if self._notify_front:
+            # Don't pickle state
+            for group in self._state.task_groups.values():
+                group._state = None
+            notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
+            self._notify_front.put(notification)
+
         return status
 
     # clear_queues()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index bc78f72..b28e40f 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1429,7 +1429,10 @@ class Stream():
         self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
 
         if self._session_start_callback is not None:
-            self._session_start_callback()
+            if self._notify_front:
+                self._notify_front.put(Notification(NotificationType.START))
+            else:
+                self._session_start_callback()
 
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
 
@@ -1724,7 +1727,9 @@ class Stream():
         return element_targets, artifact_refs
 
     def _scheduler_notification_handler(self, notification):
-        if notification.notification_type == NotificationType.MESSAGE:
+        if notification.notification_type == NotificationType.TASK_GROUPS:
+            self._state.task_groups = notification.task_groups
+        elif notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT:
             self._interrupt_callback()
@@ -1751,6 +1756,8 @@ class Stream():
         elif notification.notification_type == NotificationType.EXCEPTION:
             # Regenerate the exception here, so we don't have to pickle it
             raise SubprocessException(**notification.exception)
+        elif notification.notification_type == NotificationType.START:
+            self._session_start_callback()
         else:
             raise StreamError("Unrecognised notification type received")
 


[buildstream] 16/16: signal debug

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit aae4bbc4342dc4d30e2081279c26e34a6150db30
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Oct 29 16:03:46 2019 +0000

    signal debug
---
 src/buildstream/_frontend/app.py        | 8 +++++---
 src/buildstream/_scheduler/scheduler.py | 5 +++++
 src/buildstream/_stream.py              | 3 +++
 3 files changed, 13 insertions(+), 3 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index e146de9..d8ec3e7 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -518,14 +518,16 @@ class App():
 
             try:
                 choice = click.prompt("Choice:",
-                                      value_proc=_prefix_choice_value_proc(['continue', 'quit', 'terminate']),
+                                      value_proc=_prefix_choice_value_proc(['continue', 'quit', 'terminate', 'e']),
                                       default='continue', err=True)
             except click.Abort:
                 # Ensure a newline after automatically printed '^C'
                 click.echo("", err=True)
                 choice = 'terminate'
-
-            if choice == 'terminate':
+            if choice == 'e':
+                from traceback import print_stack
+                print_stack(file=sys.stderr)
+            elif choice == 'terminate':
                 click.echo("\nTerminating all jobs at user request\n", err=True)
                 self.stream.terminate()
             else:
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 3640d7b..2fe6532 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -534,6 +534,7 @@ class Scheduler():
 
         if not self._notify_front_queue:
             # Not running in a subprocess, scheduler process to handle keyboard interrupt
+            print("Scheduler called an interrupt")
             notification = Notification(NotificationType.INTERRUPT)
             self._notify_front(notification)
 
@@ -566,15 +567,19 @@ class Scheduler():
     # Connects our signal handler event callbacks to the mainloop
     #
     def _connect_signals(self):
+        print("sched is connecting {}".format(os.getpid()))
         self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event)
         self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event)
         self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event)
 
     def _disconnect_signals(self):
+        print("sched is disconnecting {}".format(os.getpid()))
         self.loop.remove_signal_handler(signal.SIGINT)
         self.loop.remove_signal_handler(signal.SIGTSTP)
         self.loop.remove_signal_handler(signal.SIGTERM)
 
+        signal.signal(signal.SIGINT, signal.SIG_IGN)
+
     def _terminate_jobs_real(self):
         # 20 seconds is a long time, it can take a while and sometimes
         # we still fail, need to look deeper into this again.
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 21c5410..b5d2cd2 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1860,6 +1860,7 @@ class Stream():
 
     def _connect_signals(self):
         if self.loop:
+            print("stream is connecting {}".format(os.getpid()))
             self.loop.add_signal_handler(signal.SIGINT, self._interrupt_callback)
             self.loop.add_signal_handler(signal.SIGTERM,
                                          lambda: self._notify_back(Notification(NotificationType.TERMINATE)))
@@ -1868,9 +1869,11 @@ class Stream():
 
     def _disconnect_signals(self):
         if self.loop:
+            print("stream is disconnecting {}".format(os.getpid()))
             self.loop.remove_signal_handler(signal.SIGINT)
             self.loop.remove_signal_handler(signal.SIGTSTP)
             self.loop.remove_signal_handler(signal.SIGTERM)
+            #signal.signal(signal.SIGINT, signal.SIG_IGN)
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing


[buildstream] 01/16: scheduler.py: Notification for last_task_error propagation

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 40ce52da69180415a7a2e5831ca96d257d695b1a
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Sep 10 15:10:04 2019 +0100

    scheduler.py: Notification for last_task_error propagation
    
    Add a notification for TASK_ERROR. As queues & job handlers will
    be running in a different process to the front end, the global
    state in the frontend Exception process needs to be notified.
    This is used internally for the BST_TEST_SUITE.
---
 src/buildstream/_scheduler/jobs/job.py     |  6 +++---
 src/buildstream/_scheduler/queues/queue.py |  4 ++--
 src/buildstream/_scheduler/scheduler.py    | 20 +++++++++++++++++++-
 src/buildstream/_stream.py                 |  4 +++-
 4 files changed, 27 insertions(+), 7 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 940b7d2..88df820 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -30,7 +30,7 @@ import sys
 import traceback
 
 # BuildStream toplevel imports
-from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
+from ..._exceptions import ImplError, BstError, SkipJob
 from ..._message import Message, MessageType, unconditional_messages
 from ...types import FastEnum
 from ... import _signals, utils
@@ -541,8 +541,8 @@ class Job():
             # For regression tests only, save the last error domain / reason
             # reported from a child task in the main process, this global state
             # is currently managed in _exceptions.py
-            set_last_task_error(envelope.message['domain'],
-                                envelope.message['reason'])
+            self._scheduler.set_last_task_error(envelope.message['domain'],
+                                                envelope.message['reason'])
         elif envelope.message_type is _MessageType.RESULT:
             assert self._result is None
             self._result = envelope.message
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 49fae56..664741b 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -30,7 +30,7 @@ from ..jobs import ElementJob, JobStatus
 from ..resources import ResourceType
 
 # BuildStream toplevel imports
-from ..._exceptions import BstError, ImplError, set_last_task_error
+from ..._exceptions import BstError, ImplError
 from ..._message import Message, MessageType
 from ...types import FastEnum
 
@@ -320,7 +320,7 @@ class Queue():
             #
             # This just allows us stronger testing capability
             #
-            set_last_task_error(e.domain, e.reason)
+            self._scheduler.set_last_task_error(e.domain, e.reason)
 
         except Exception:   # pylint: disable=broad-except
 
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index d3faa2a..c85c141 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -63,6 +63,7 @@ class NotificationType(FastEnum):
     SUSPENDED = "suspended"
     RETRY = "retry"
     MESSAGE = "message"
+    TASK_ERROR = "task_error"
 
 
 # Notification()
@@ -83,7 +84,8 @@ class Notification():
                  job_status=None,
                  time=None,
                  element=None,
-                 message=None):
+                 message=None,
+                 task_error=None):
         self.notification_type = notification_type
         self.full_name = full_name
         self.job_action = job_action
@@ -91,6 +93,7 @@ class Notification():
         self.time = time
         self.element = element
         self.message = message
+        self.task_error = task_error  # Tuple of domain & reason
 
 
 # Scheduler()
@@ -326,6 +329,21 @@ class Scheduler():
     def notify_messenger(self, message):
         self._notify(Notification(NotificationType.MESSAGE, message=message))
 
+    # set_last_task_error()
+    #
+    # Save the last error domain / reason reported from a child job or queue
+    # in the main process.
+    #
+    # Args:
+    #    domain (ErrorDomain): Enum for the domain from which the error occurred
+    #    reason (str): String identifier representing the reason for the error
+    #
+    def set_last_task_error(self, domain, reason):
+        task_error = domain, reason
+        notification = Notification(NotificationType.TASK_ERROR,
+                                    task_error=task_error)
+        self._notify(notification)
+
     #######################################################
     #                  Local Private Methods              #
     #######################################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c7ada6e..63c09da 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -31,7 +31,7 @@ from fnmatch import fnmatch
 from collections import deque
 
 from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
 from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
     SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus
@@ -1683,6 +1683,8 @@ class Stream():
             self._scheduler_terminated = True
         elif notification.notification_type == NotificationType.SUSPENDED:
             self._scheduler_suspended = not self._scheduler_suspended
+        elif notification.notification_type == NotificationType.TASK_ERROR:
+            set_last_task_error(*notification.task_error)
         else:
             raise StreamError("Unrecognised notification type received")
 


[buildstream] 02/16: Add in dual queue implementation for subprocess build

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 889a6ea39390214d62294f5eee5e37b1a69f8c43
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Sep 16 12:20:06 2019 +0100

    Add in dual queue implementation for subprocess build
---
 src/buildstream/_scheduler/scheduler.py |  42 ++++++++---
 src/buildstream/_stream.py              | 123 ++++++++++++++++++++++++++------
 src/buildstream/utils.py                |   7 +-
 3 files changed, 142 insertions(+), 30 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index c85c141..5a3da69 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -24,6 +24,7 @@ import asyncio
 from itertools import chain
 import signal
 import datetime
+import queue
 
 # Local imports
 from .resources import Resources
@@ -64,6 +65,7 @@ class NotificationType(FastEnum):
     RETRY = "retry"
     MESSAGE = "message"
     TASK_ERROR = "task_error"
+    EXCEPTION = "exception"
 
 
 # Notification()
@@ -85,7 +87,9 @@ class Notification():
                  time=None,
                  element=None,
                  message=None,
-                 task_error=None):
+                 task_error=None,
+                 for_scheduler=False,
+                 exception=None):
         self.notification_type = notification_type
         self.full_name = full_name
         self.job_action = job_action
@@ -94,6 +98,7 @@ class Notification():
         self.element = element
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
+        self.exception = exception
 
 
 # Scheduler()
@@ -119,7 +124,7 @@ class Notification():
 class Scheduler():
 
     def __init__(self, context,
-                 start_time, state, notification_queue, notifier):
+                 start_time, state, notifier):
 
         #
         # Public members
@@ -143,8 +148,10 @@ class Scheduler():
         self._state = state
         self._casd_process = None             # handle to the casd process for monitoring purpose
 
-        # Bidirectional queue to send notifications back to the Scheduler's owner
-        self._notification_queue = notification_queue
+        # Bidirectional pipe to send notifications back to the Scheduler's owner
+        self._notify_front = None
+        self._notify_back = None
+        # Notifier callback to use if not running in a subprocess
         self._notifier = notifier
 
         self.resources = Resources(context.sched_builders,
@@ -189,6 +196,10 @@ class Scheduler():
         self._casd_process = casd_process
         _watcher = asyncio.get_child_watcher()
         _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+        
+        # Add notification handler
+        if self._notify_back:
+            self.loop.call_later(0.01, self._loop)
 
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
@@ -580,12 +591,13 @@ class Scheduler():
         queue.enqueue([element])
 
     def _notify(self, notification):
-        # Scheduler to Stream notifcations on right side
-        self._notification_queue.append(notification)
-        self._notifier()
+        # Check if we need to call the notifier callback
+        if self._notify_front:
+            self._notify_front.put(notification)
+        else:
+            self._notifier(notification)
 
-    def _stream_notification_handler(self):
-        notification = self._notification_queue.popleft()
+    def _stream_notification_handler(self, notification):
         if notification.notification_type == NotificationType.TERMINATE:
             self.terminate_jobs()
         elif notification.notification_type == NotificationType.QUIT:
@@ -601,6 +613,18 @@ class Scheduler():
             # as we don't want to pickle exceptions between processes
             raise ValueError("Unrecognised notification type received")
 
+    def _loop(self):
+        assert self._notify_back
+        # Check for and process new messages
+        while True:
+            try:
+                notification = self._notify_back.get_nowait()
+                self._stream_notification_handler(notification)
+            except queue.Empty:
+                notification = None
+                break
+        self.loop.call_later(0.01, self._loop)
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 63c09da..7cb3515 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,6 +19,9 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 #        Tristan Maat <tr...@codethink.co.uk>
 
+import asyncio
+import functools
+import multiprocessing as mp
 import os
 import sys
 import stat
@@ -26,9 +29,9 @@ import shlex
 import shutil
 import tarfile
 import tempfile
+import queue
 from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
-from collections import deque
 
 from ._artifactelement import verify_artifact_ref, ArtifactElement
 from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
@@ -79,13 +82,15 @@ class Stream():
         self._project = None
         self._pipeline = None
         self._state = State(session_start)  # Owned by Stream, used by Core to set state
-        self._notification_queue = deque()
+        #self._notification_pipe_front, self._notification_pipe_back = mp.Pipe()
+        self._subprocess = None
         self._starttime = session_start  # Synchronised with Scheduler's relative start time
 
         context.messenger.set_state(self._state)
 
-        self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
-                                    self._scheduler_notification_handler)
+        # Scheduler may use callback for notification depending on whether it's subprocessed
+        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler)
+
         self._first_non_track_queue = None
         self._session_start_callback = session_start_callback
         self._ticker_callback = ticker_callback
@@ -94,6 +99,8 @@ class Stream():
         self._scheduler_running = False
         self._scheduler_terminated = False
         self._scheduler_suspended = False
+        self._notify_front = None
+        self._notify_back = None
 
     # init()
     #
@@ -104,11 +111,69 @@ class Stream():
         self._artifacts = self._context.artifactcache
         self._sourcecache = self._context.sourcecache
 
+    @staticmethod
+    def _subprocess_main(func, notify, *args, **kwargs):
+        # Set main process
+        utils._reset_main_pid()
+        try:
+            func(*args, **kwargs)
+        except Exception as e:
+            notify.put(Notification(NotificationType.EXCEPTION, exception=e))
+
+    def run_in_subprocess(self, func, *args, **kwargs):
+        print("Args: {}".format([*args]))
+        print("Kwargs: {}".format(list(kwargs.items())))
+        assert not self._subprocess
+
+        mp_context = mp.get_context(method='fork')
+        process_name = "stream-{}".format(func.__name__)
+        
+        self._notify_front = mp.Queue()
+        self._notify_back = mp.Queue()
+        # Tell the scheduler to not use the notifier callback
+        self._scheduler._notify_front = self._notify_front
+        self._scheduler._notify_back = self._notify_back
+        
+        args = list(args)
+        args.insert(0, self._notify_front)
+        args.insert(0, func)
+        print("launching subprocess:", process_name)
+
+        self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
+                                              kwargs=kwargs, name=process_name)
+
+        self._subprocess.start()
+
+        # TODO connect signal handlers with asyncio
+        while self._subprocess.exitcode is None:
+            # check every given time interval on subprocess state
+            self._subprocess.join(0.01)
+            # if no exit code, go back to checking the message queue
+            self._loop()
+        print("Stopping loop...")
+
+        # Set main process back
+        utils._reset_main_pid()
+
+        # Ensure no more notifcations to process
+        try:
+            while True:
+                notification = self._notify_front.get_nowait()
+                self._scheduler_notification_handler(notification)
+        except queue.Empty:
+            print("Finished processing notifications")
+            pass
+
     # cleanup()
     #
     # Cleans up application state
     #
     def cleanup(self):
+        # Close the notification queue
+        for q in [self._notify_back, self._notify_front]:
+            if q is not None:
+                q.close()
+        #self._notification_queue.cancel_join_thread()
         if self._project:
             self._project.cleanup()
 
@@ -233,6 +298,9 @@ class Stream():
         return element._shell(scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command,
                               usebuildtree=buildtree)
 
+    def build(self, *args, **kwargs):
+        self.run_in_subprocess(self._build, *args, **kwargs)
+
     # build()
     #
     # Builds (assembles) elements in the pipeline.
@@ -249,13 +317,13 @@ class Stream():
     # If `remote` specified as None, then regular configuration will be used
     # to determine where to push artifacts to.
     #
-    def build(self, targets, *,
-              selection=PipelineSelection.PLAN,
-              track_targets=None,
-              track_except=None,
-              track_cross_junctions=False,
-              ignore_junction_targets=False,
-              remote=None):
+    def _build(self, targets, *,
+               selection=PipelineSelection.PLAN,
+               track_targets=None,
+               track_except=None,
+               track_cross_junctions=False,
+               ignore_junction_targets=False,
+               remote=None):
 
         use_config = True
         if remote:
@@ -1657,11 +1725,7 @@ class Stream():
 
         return element_targets, artifact_refs
 
-    def _scheduler_notification_handler(self):
-        # Check the queue is there
-        assert self._notification_queue
-        notification = self._notification_queue.pop()
-
+    def _scheduler_notification_handler(self, notification):
         if notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT:
@@ -1671,6 +1735,7 @@ class Stream():
         elif notification.notification_type == NotificationType.JOB_START:
             self._state.add_task(notification.job_action, notification.full_name, notification.time)
         elif notification.notification_type == NotificationType.JOB_COMPLETE:
+            # State between scheduler & stream is different if ran in subprocces
             self._state.remove_task(notification.job_action, notification.full_name)
             if notification.job_status == JobStatus.FAIL:
                 self._state.fail_task(notification.job_action, notification.full_name,
@@ -1685,14 +1750,32 @@ class Stream():
             self._scheduler_suspended = not self._scheduler_suspended
         elif notification.notification_type == NotificationType.TASK_ERROR:
             set_last_task_error(*notification.task_error)
+        elif notification.notification_type == NotificationType.EXCEPTION:
+            raise notification.exception
         else:
             raise StreamError("Unrecognised notification type received")
 
     def _notify(self, notification):
-        # Stream to scheduler notifcations on left side
-        self._notification_queue.appendleft(notification)
-        self._notifier()
-
+        # Set that the notifcation is for the scheduler
+        #notification.for_scheduler = True
+        if self._notify_back:
+            self._notify_back.put(notification)
+        else:
+            self._scheduler._stream_notification_handler(notification)
+
+    # The code to be run by the Stream's event loop while delegating
+    # work to a subprocess with the @subprocessed decorator
+    def _loop(self):
+        assert self._notify_front
+        # Check for and process new messages
+        while True:
+            try:
+                notification = self._notify_front.get_nowait()
+                self._scheduler_notification_handler(notification)
+            except queue.Empty:
+                notification = None
+                break
+    
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index de7c14b..75978ef 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -739,6 +739,11 @@ def _is_main_process():
     return os.getpid() == _MAIN_PID
 
 
+def _reset_main_pid():
+    global _MAIN_PID
+    _MAIN_PID = os.getpid()
+
+
 # Recursively remove directories, ignoring file permissions as much as
 # possible.
 def _force_rmtree(rootpath, **kwargs):
@@ -1429,7 +1434,7 @@ def _is_single_threaded():
     # gRPC threads are not joined when shut down. Wait for them to exit.
     wait = 0.1
     for _ in range(0, int(_AWAIT_THREADS_TIMEOUT_SECONDS / wait)):
-        if process.num_threads() == expected_num_threads:
+        if process.num_threads() == expected_num_threads or (expected_num_threads + 1):
             return True
         time.sleep(wait)
     return False


[buildstream] 14/16: Lint fixes

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit c357eddbc4af3428632cfb8ad9e5ceb9aeb710db
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Oct 29 12:52:27 2019 +0000

    Lint fixes
---
 src/buildstream/_exceptions.py          |  3 +++
 src/buildstream/_scheduler/scheduler.py |  6 +++---
 src/buildstream/_state.py               |  3 ++-
 src/buildstream/_stream.py              | 30 ++++++++++++++++--------------
 4 files changed, 24 insertions(+), 18 deletions(-)

diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index 1468381..0ced045 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -43,6 +43,7 @@ def get_last_exception():
     _last_exception = None
     return le
 
+
 # set_last_exception()
 #
 # Sets the last exception from the main process, used if Stream is running a subprocess
@@ -52,6 +53,7 @@ def set_last_exception(exception):
         global _last_exception
         _last_exception = exception
 
+
 # get_last_task_error()
 #
 # Fetches the last exception from a task
@@ -389,6 +391,7 @@ class ArtifactElementError(BstError):
     def __init__(self, message, *, detail=None, reason=None):
         super().__init__(message, detail=detail, domain=ErrorDomain.ELEMENT, reason=reason)
 
+
 class SubprocessException(BstError):
     def __init__(self, **kwargs):
         super().__init__(kwargs['message'], detail=kwargs['detail'],
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index b6539f6..cdf1d5a 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -97,7 +97,7 @@ class Notification():
                  exception=None,
                  task_groups=None,
                  element_totals=None,
-                 show_pipeline = None):
+                 show_pipeline=None):
         self.notification_type = notification_type
         self.full_name = full_name
         self.job_action = job_action
@@ -107,9 +107,9 @@ class Notification():
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
-        self.task_groups = task_groups # Tuple of queue name, complete name, task change, & optional element name
+        self.task_groups = task_groups  # Tuple of queue name, complete name, task change, & optional element name
         self.element_totals = element_totals
-        self.show_pipeline = show_pipeline # Output of LogLine.show_pipeline() cb, to represent pipeline state
+        self.show_pipeline = show_pipeline  # Output of LogLine.show_pipeline() cb, to represent pipeline state
 
 
 # Scheduler()
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index f35f07a..d78850f 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -89,6 +89,8 @@ class TaskGroup():
             # If name matches group, or if name not given call the cb
             if name == self.name or name is None:
                 cb(name, self.complete_name, 'failed_tasks', full_name)
+
+
 # State
 #
 # The state data that is stored for the purpose of sharing with the frontend.
@@ -258,7 +260,6 @@ class State():
     def unregister_task_groups_changed_callback(self, callback, name=None):
         self._task_groups_changed_cbs.remove((callback, name))
 
-
     ##############################################
     # Core-facing APIs for driving notifications #
     ##############################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 61a8322..21c5410 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -35,7 +35,8 @@ from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
 
 from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error, SubprocessException, set_last_exception
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error, \
+    SubprocessException, set_last_exception
 from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
     SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus
@@ -346,17 +347,16 @@ class Stream():
         if remote:
             use_config = False
 
-
         elements, track_elements = \
             self._load(targets, track_targets,
-                selection=selection, track_selection=PipelineSelection.ALL,
-                track_except_targets=track_except,
-                track_cross_junctions=track_cross_junctions,
-                ignore_junction_targets=ignore_junction_targets,
-                use_artifact_config=use_config,
-                artifact_remote_url=remote,
-                use_source_config=True,
-                dynamic_plan=True)
+                       selection=selection, track_selection=PipelineSelection.ALL,
+                       track_except_targets=track_except,
+                       track_cross_junctions=track_cross_junctions,
+                       ignore_junction_targets=ignore_junction_targets,
+                       use_artifact_config=use_config,
+                       artifact_remote_url=remote,
+                       use_source_config=True,
+                       dynamic_plan=True)
 
         # Remove the tracking elements from the main targets
         elements = self._pipeline.subtract_elements(elements, track_elements)
@@ -1457,9 +1457,9 @@ class Stream():
         self._notify_front(Notification(NotificationType.ELEMENT_TOTALS,
                                         element_totals=element_totals))
 
-
         # Also send through the pipeline renderer output for heading & summary rendering
-        total_pipeline_render = self._pipeline_render_callback(self.total_elements, self._context.log_element_format) # pylint: disable=not-callable
+        total_pipeline_render = self._pipeline_render_callback(self.total_elements,  # pylint: disable=not-callable
+                                                               self._context.log_element_format)
         self._notify_front(Notification(NotificationType.SHOW_PIPELINE, show_pipeline=total_pipeline_render))
 
         if self._session_start_callback is not None:
@@ -1861,8 +1861,10 @@ class Stream():
     def _connect_signals(self):
         if self.loop:
             self.loop.add_signal_handler(signal.SIGINT, self._interrupt_callback)
-            self.loop.add_signal_handler(signal.SIGTERM, lambda: self._notify_back(Notification(NotificationType.TERMINATE)))
-            self.loop.add_signal_handler(signal.SIGTSTP, lambda: self._notify_back(Notification(NotificationType.SIGTSTP)))
+            self.loop.add_signal_handler(signal.SIGTERM,
+                                         lambda: self._notify_back(Notification(NotificationType.TERMINATE)))
+            self.loop.add_signal_handler(signal.SIGTSTP,
+                                         lambda: self._notify_back(Notification(NotificationType.SIGTSTP)))
 
     def _disconnect_signals(self):
         if self.loop:


[buildstream] 11/16: basic user SIG interrupt handling in Stream

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 7870bc3bcbc958e07fc9cbdb320418209a489113
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Oct 21 16:24:33 2019 +0100

    basic user SIG interrupt handling in Stream
---
 src/buildstream/_scheduler/scheduler.py |  9 +++++++--
 src/buildstream/_stream.py              | 21 +++++++++++++++++++--
 2 files changed, 26 insertions(+), 4 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 854921b..74a688f 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -70,6 +70,7 @@ class NotificationType(FastEnum):
     TASK_GROUPS = "task_groups"
     ELEMENT_TOTALS = "element_totals"
     FINISH = "finish"
+    SIGTSTP = "sigstp"
 
 
 # Notification()
@@ -536,8 +537,10 @@ class Scheduler():
         if self.terminated:
             return
 
-        notification = Notification(NotificationType.INTERRUPT)
-        self._notify_front(notification)
+        if not self._notify_front_queue:
+            # Not running in a subprocess, scheduler process to handle keyboard interrupt
+            notification = Notification(NotificationType.INTERRUPT)
+            self._notify_front(notification)
 
     # _terminate_event():
     #
@@ -629,6 +632,8 @@ class Scheduler():
             self.jobs_unsuspended()
         elif notification.notification_type == NotificationType.RETRY:
             self._failure_retry(notification.job_action, notification.element)
+        elif notification.notification_type == NotificationType.SIGTSTP:
+            self._suspend_event()
         else:
             raise ValueError("Unrecognised notification type received")
 
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 8bae8ec..866cbd7 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -30,6 +30,7 @@ import shutil
 import tarfile
 import tempfile
 import queue
+import signal
 from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
 
@@ -150,17 +151,17 @@ class Stream():
         # We can now launch another async
         self.loop = asyncio.new_event_loop()
         asyncio.set_event_loop(self.loop)
+        self._connect_signals()
         self._start_listening()
         self.loop.set_exception_handler(self._handle_exception)
         self._watch_casd()
         self.loop.run_forever()
 
-        # TODO connect signal handlers with asyncio
-
         # Scheduler has stopped running, so safe to still have async here
         self._stop_listening()
         self._stop_watching_casd()
         self.loop.close()
+        self._disconnect_signals()
         self.loop = None
         self._subprocess.join()
         self._subprocess = None
@@ -1221,10 +1222,14 @@ class Stream():
         # Send the notification to suspend jobs
         notification = Notification(NotificationType.SUSPEND)
         self._notify_back(notification)
+        # Disconnect signals if stream is handling them
+        self._disconnect_signals()
         yield
         # Unsuspend jobs on context exit
         notification = Notification(NotificationType.UNSUSPEND)
         self._notify_back(notification)
+        # Connect signals if stream is handling them
+        self._connect_signals()
 
     #############################################################
     #                    Private Methods                        #
@@ -1835,6 +1840,18 @@ class Stream():
         self._context._subprocess_exception = exception
         self.loop.stop()
 
+    def _connect_signals(self):
+        if self.loop:
+            self.loop.add_signal_handler(signal.SIGINT, self._interrupt_callback)
+            self.loop.add_signal_handler(signal.SIGTERM, lambda: self._notify_back(Notification(NotificationType.TERMINATE)))
+            self.loop.add_signal_handler(signal.SIGTSTP, lambda: self._notify_back(Notification(NotificationType.SIGTSTP)))
+
+    def _disconnect_signals(self):
+        if self.loop:
+            self.loop.remove_signal_handler(signal.SIGINT)
+            self.loop.remove_signal_handler(signal.SIGTSTP)
+            self.loop.remove_signal_handler(signal.SIGTERM)
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and


[buildstream] 05/16: Explicitly ensure failed build sources are not pushed

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit fdda15d076896d592b1baaa5cd0f2e29eb921b85
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Sep 26 15:04:07 2019 +0100

    Explicitly ensure failed build sources are not pushed
---
 src/buildstream/element.py | 3 ++-
 tests/sourcecache/push.py  | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index dac6eb1..2172268 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1838,7 +1838,8 @@ class Element(Plugin):
         return True
 
     def _skip_source_push(self):
-        if not self.__sources or self._get_workspace():
+        # Skip push if we have no sources, are workspaced or the given element failed to build
+        if not self.__sources or self._get_workspace() or not self._get_build_result()[0]:
             return True
         return not (self.__sourcecache.has_push_remotes(plugin=self) and
                     self._source_cached())
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index 406aeba..94e4038 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -295,5 +295,5 @@ def test_source_push_build_fail(cli, tmpdir, datafiles):
         res.assert_task_error(ErrorDomain.ELEMENT, None)
 
         # Sources are not pushed as the build queue is before the source push
-        # queue.
+        # queue. We explicitly don't want to push failed build source by default.
         assert "Pushed source " not in res.stderr


[buildstream] 07/16: Make it more verbose with front & back notifications

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 73889e1cdf436fecccb9a1b2eb8fcb75830094cb
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Sep 27 17:46:46 2019 +0100

    Make it more verbose with front & back notifications
---
 src/buildstream/_scheduler/scheduler.py | 48 +++++++++++------------
 src/buildstream/_stream.py              | 68 ++++++++++++++++-----------------
 2 files changed, 58 insertions(+), 58 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 978ee58..a17d7d2 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -155,8 +155,8 @@ class Scheduler():
         self._casd_process = None             # handle to the casd process for monitoring purpose
 
         # Bidirectional pipe to send notifications back to the Scheduler's owner
-        self._notify_front = None
-        self._notify_back = None
+        self._notify_front_queue = None
+        self._notify_back_queue = None
         # Notifier callback to use if not running in a subprocess
         self._notifier = notifier
 
@@ -190,7 +190,7 @@ class Scheduler():
         asyncio.set_event_loop(self.loop)
 
         # Notify that the loop has been created
-        self._notify(Notification(NotificationType.RUNNING))
+        self._notify_front(Notification(NotificationType.RUNNING))
 
         # Add timeouts
         self.loop.call_later(1, self._tick)
@@ -204,7 +204,7 @@ class Scheduler():
         _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
         
         # Add notification handler
-        if self._notify_back:
+        if self._notify_back_queue:
             self.loop.call_later(0.01, self._loop)
 
         # Start the profiler
@@ -225,7 +225,7 @@ class Scheduler():
         self.loop = None
 
         # Notify that the loop has been reset
-        self._notify(Notification(NotificationType.RUNNING))
+        self._notify_front(Notification(NotificationType.RUNNING))
 
         if failed:
             status = SchedStatus.ERROR
@@ -235,12 +235,12 @@ class Scheduler():
             status = SchedStatus.SUCCESS
 
         # Send the state taskgroups if we're running under the subprocess
-        if self._notify_front:
+        if self._notify_front_queue:
             # Don't pickle state
             for group in self._state.task_groups.values():
                 group._state = None
             notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
-            self._notify_front.put(notification)
+            self._notify_front_queue.put(notification)
 
         return status
 
@@ -279,7 +279,7 @@ class Scheduler():
 
         # Notify the frontend that we're terminated as it might be
         # from an interactive prompt callback or SIGTERM
-        self._notify(Notification(NotificationType.TERMINATED))
+        self._notify_front(Notification(NotificationType.TERMINATED))
         self.loop.call_soon(self._terminate_jobs_real)
 
         # Block this until we're finished terminating jobs,
@@ -340,7 +340,7 @@ class Scheduler():
                                     job_action=job.action_name,
                                     job_status=status,
                                     element=element_info)
-        self._notify(notification)
+        self._notify_front(notification)
         self._sched()
 
     # notify_messenger()
@@ -352,7 +352,7 @@ class Scheduler():
     #                       handler, as assigned by context's messenger.
     #
     def notify_messenger(self, message):
-        self._notify(Notification(NotificationType.MESSAGE, message=message))
+        self._notify_front(Notification(NotificationType.MESSAGE, message=message))
 
     # set_last_task_error()
     #
@@ -367,7 +367,7 @@ class Scheduler():
         task_error = domain, reason
         notification = Notification(NotificationType.TASK_ERROR,
                                     task_error=task_error)
-        self._notify(notification)
+        self._notify_front(notification)
 
     #######################################################
     #                  Local Private Methods              #
@@ -404,7 +404,7 @@ class Scheduler():
                                     full_name=job.name,
                                     job_action=job.action_name,
                                     time=self._state.elapsed_time(start_time=self._starttime))
-        self._notify(notification)
+        self._notify_front(notification)
         job.start()
 
     # _sched_queue_jobs()
@@ -496,7 +496,7 @@ class Scheduler():
             self._suspendtime = datetime.datetime.now()
             self.suspended = True
             # Notify that we're suspended
-            self._notify(Notification(NotificationType.SUSPENDED))
+            self._notify_front(Notification(NotificationType.SUSPENDED))
             for job in self._active_jobs:
                 job.suspend()
 
@@ -510,9 +510,9 @@ class Scheduler():
                 job.resume()
             self.suspended = False
             # Notify that we're unsuspended
-            self._notify(Notification(NotificationType.SUSPENDED))
+            self._notify_front(Notification(NotificationType.SUSPENDED))
             self._starttime += (datetime.datetime.now() - self._suspendtime)
-            self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
+            self._notify_front(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
             self._suspendtime = None
 
     # _interrupt_event():
@@ -530,7 +530,7 @@ class Scheduler():
             return
 
         notification = Notification(NotificationType.INTERRUPT)
-        self._notify(notification)
+        self._notify_front(notification)
 
     # _terminate_event():
     #
@@ -589,7 +589,7 @@ class Scheduler():
 
     # Regular timeout for driving status in the UI
     def _tick(self):
-        self._notify(Notification(NotificationType.TICK))
+        self._notify_front(Notification(NotificationType.TICK))
         self.loop.call_later(1, self._tick)
 
     def _failure_retry(self, action_name, unique_id):
@@ -604,14 +604,14 @@ class Scheduler():
         queue._task_group.failed_tasks.remove(element._get_full_name())
         queue.enqueue([element])
 
-    def _notify(self, notification):
+    def _notify_front(self, notification):
         # Check if we need to call the notifier callback
-        if self._notify_front:
-            self._notify_front.put(notification)
+        if self._notify_front_queue:
+            self._notify_front_queue.put(notification)
         else:
             self._notifier(notification)
 
-    def _stream_notification_handler(self, notification):
+    def _notification_handler(self, notification):
         if notification.notification_type == NotificationType.TERMINATE:
             self.terminate_jobs()
         elif notification.notification_type == NotificationType.QUIT:
@@ -626,12 +626,12 @@ class Scheduler():
             raise ValueError("Unrecognised notification type received")
 
     def _loop(self):
-        assert self._notify_back
+        assert self._notify_back_queue
         # Check for and process new messages
         while True:
             try:
-                notification = self._notify_back.get_nowait()
-                self._stream_notification_handler(notification)
+                notification = self._notify_back_queue.get_nowait()
+                self._notification_handler(notification)
             except queue.Empty:
                 notification = None
                 break
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index a2b9b8f..cdfa06c 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -90,18 +90,17 @@ class Stream():
         context.messenger.set_state(self._state)
 
         # Scheduler may use callback for notification depending on whether it's subprocessed
-        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler)
+        self._scheduler = Scheduler(context, session_start, self._state, self._notification_handler)
 
         self._first_non_track_queue = None
         self._session_start_callback = session_start_callback
         self._ticker_callback = ticker_callback
         self._interrupt_callback = interrupt_callback
-        self._notifier = self._scheduler._stream_notification_handler  # Assign the schedulers notification handler
         self._scheduler_running = False
         self._scheduler_terminated = False
         self._scheduler_suspended = False
-        self._notify_front = None
-        self._notify_back = None
+        self._notify_front_queue = None
+        self._notify_back_queue = None
 
     # init()
     #
@@ -129,14 +128,14 @@ class Stream():
         mp_context = mp.get_context(method='fork')
         process_name = "stream-{}".format(func.__name__)
 
-        self._notify_front = mp.Queue()
-        self._notify_back = mp.Queue()
+        self._notify_front_queue = mp.Queue()
+        self._notify_back_queue = mp.Queue()
         # Tell the scheduler to not use the notifier callback
-        self._scheduler._notify_front = self._notify_front
-        self._scheduler._notify_back = self._notify_back
+        self._scheduler._notify_front_queue = self._notify_front_queue
+        self._scheduler._notify_back_queue = self._notify_back_queue
 
         args = list(args)
-        args.insert(0, self._notify_front)
+        args.insert(0, self._notify_front_queue)
         args.insert(0, func)
 
         self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
@@ -157,8 +156,8 @@ class Stream():
         # Ensure no more notifcations to process
         try:
             while True:
-                notification = self._notify_front.get_nowait()
-                self._scheduler_notification_handler(notification)
+                notification = self._notify_front_queue.get_nowait()
+                self._notification_handler(notification)
         except queue.Empty:
             pass
 
@@ -169,7 +168,7 @@ class Stream():
     #
     def cleanup(self):
         # Close the notification queue
-        for q in [self._notify_back, self._notify_front]:
+        for q in [self._notify_back_queue, self._notify_front_queue]:
             if q is not None:
                 q.close()
         #self._notification_queue.cancel_join_thread()
@@ -1184,7 +1183,7 @@ class Stream():
     #
     def terminate(self):
         notification = Notification(NotificationType.TERMINATE)
-        self._notify(notification)
+        self._notify_back(notification)
 
     # quit()
     #
@@ -1194,7 +1193,7 @@ class Stream():
     #
     def quit(self):
         notification = Notification(NotificationType.QUIT)
-        self._notify(notification)
+        self._notify_back(notification)
 
     # suspend()
     #
@@ -1204,11 +1203,11 @@ class Stream():
     def suspend(self):
         # Send the notification to suspend jobs
         notification = Notification(NotificationType.SUSPEND)
-        self._notify(notification)
+        self._notify_back(notification)
         yield
         # Unsuspend jobs on context exit
         notification = Notification(NotificationType.UNSUSPEND)
-        self._notify(notification)
+        self._notify_back(notification)
 
     #############################################################
     #                    Private Methods                        #
@@ -1416,7 +1415,7 @@ class Stream():
         notification = Notification(NotificationType.RETRY,
                                     job_action=action_name,
                                     element=unique_id)
-        self._notify(notification)
+        self._notify_back(notification)
 
     # _run()
     #
@@ -1430,18 +1429,13 @@ class Stream():
         self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
 
         if self._session_start_callback is not None:
-            if self._notify_front:
-                self._notify_front.put(Notification(NotificationType.START))
-            else:
-                self._session_start_callback()
+            self._notify_front(Notification(NotificationType.START))
 
         # Also send through the session & total elements list lengths for status rendering
         element_totals = str(len(self.session_elements)), str(len(self.total_elements))
-        if self._notify_front:
-            self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS,
-                                                element_totals=element_totals))
-        else:
-            self.len_session_elements, self.len_total_elements = element_totals
+        self._notify_front(Notification(NotificationType.ELEMENT_TOTALS,
+                                        element_totals=element_totals))
+
 
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
 
@@ -1735,7 +1729,7 @@ class Stream():
 
         return element_targets, artifact_refs
 
-    def _scheduler_notification_handler(self, notification):
+    def _notification_handler(self, notification):
         if notification.notification_type == NotificationType.TASK_GROUPS:
             self._state.task_groups = notification.task_groups
         elif notification.notification_type == NotificationType.MESSAGE:
@@ -1772,21 +1766,27 @@ class Stream():
         else:
             raise StreamError("Unrecognised notification type received")
 
-    def _notify(self, notification):
-        if self._notify_back:
-            self._notify_back.put(notification)
+    def _notify_back(self, notification):
+        if self._notify_back_queue:
+            self._notify_back_queue.put(notification)
+        else:
+            self._scheduler._notification_handler(notification)
+
+    def _notify_front(self, notification):
+        if self._notify_front_queue:
+            self._notify_front_queue.put(notification)
         else:
-            self._scheduler._stream_notification_handler(notification)
+            self._notification_handler(notification)
 
     # The code to be run by the Stream's event loop while delegating
     # work to a subprocess with the @subprocessed decorator
     def _loop(self):
-        assert self._notify_front
+        assert self._notify_front_queue
         # Check for and process new messages
         while True:
             try:
-                notification = self._notify_front.get_nowait()
-                self._scheduler_notification_handler(notification)
+                notification = self._notify_front_queue.get_nowait()
+                self._notification_handler(notification)
             except queue.Empty:
                 notification = None
                 break


[buildstream] 08/16: Move sched notification poll to loop reader

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit f0f29863751531d43db3626ffc2cc06b0845dc5b
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Oct 2 11:30:10 2019 +0100

    Move sched notification poll to loop reader
---
 src/buildstream/_scheduler/scheduler.py | 32 +++++++++++++++++---------------
 1 file changed, 17 insertions(+), 15 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index a17d7d2..a0a87fb 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -202,16 +202,17 @@ class Scheduler():
         self._casd_process = casd_process
         _watcher = asyncio.get_child_watcher()
         _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
-        
-        # Add notification handler
-        if self._notify_back_queue:
-            self.loop.call_later(0.01, self._loop)
+
+        # Add notification listener if in subprocess
+        self._start_listening()
 
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
             # Run the queues
             self._sched()
             self.loop.run_forever()
+            # Stop listening for notifications
+            self._stop_listening()
             self.loop.close()
 
         # Stop watching casd
@@ -386,7 +387,7 @@ class Scheduler():
     #
     def _abort_on_casd_failure(self, pid, returncode):
         message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
-        self._notify(Notification(NotificationType.MESSAGE, message=message))
+        self._notify_front(Notification(NotificationType.MESSAGE, message=message))
 
         self._casd_process.returncode = returncode
         self.terminate_jobs()
@@ -626,16 +627,17 @@ class Scheduler():
             raise ValueError("Unrecognised notification type received")
 
     def _loop(self):
-        assert self._notify_back_queue
-        # Check for and process new messages
-        while True:
-            try:
-                notification = self._notify_back_queue.get_nowait()
-                self._notification_handler(notification)
-            except queue.Empty:
-                notification = None
-                break
-        self.loop.call_later(0.01, self._loop)
+        while not self._notify_back_queue.empty():
+            notification = self._notify_back_queue.get_nowait()
+            self._notification_handler(notification)
+
+    def _start_listening(self):
+        if self._notify_back_queue:
+            self.loop.add_reader(self._notify_back_queue._reader.fileno(), self._loop)
+
+    def _stop_listening(self):
+        if self._notify_back_queue:
+            self.loop.remove_reader(self._notify_back_queue._reader.fileno())
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing


[buildstream] 06/16: Add len of session/total elements members to Stream

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 8c3f90583aba9cb0442b0025bdc67745f15d170a
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Sep 27 14:51:53 2019 +0100

    Add len of session/total elements members to Stream
---
 src/buildstream/_frontend/status.py     |  4 ++--
 src/buildstream/_frontend/widget.py     |  4 ++--
 src/buildstream/_scheduler/scheduler.py |  5 ++++-
 src/buildstream/_stream.py              | 19 +++++++++++++++----
 4 files changed, 23 insertions(+), 9 deletions(-)

diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index 8da7df0..c648e93 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -373,8 +373,8 @@ class _StatusHeader():
         #
         #  ========= 00:00:00 project-name (143/387) =========
         #
-        session = str(len(self._stream.session_elements))
-        total = str(len(self._stream.total_elements))
+        session = self._stream.len_session_elements
+        total = self._stream.len_total_elements
 
         size = 0
         text = ''
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 181ee7d..d40b87c 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -565,8 +565,8 @@ class LogLine(Widget):
         text += self.content_profile.fmt("Pipeline Summary\n", bold=True)
         values = OrderedDict()
 
-        values['Total'] = self.content_profile.fmt(str(len(stream.total_elements)))
-        values['Session'] = self.content_profile.fmt(str(len(stream.session_elements)))
+        values['Total'] = self.content_profile.fmt(stream.len_total_elements)
+        values['Session'] = self.content_profile.fmt(stream.len_session_elements)
 
         processed_maxlen = 1
         skipped_maxlen = 1
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 6677c76..978ee58 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -68,6 +68,7 @@ class NotificationType(FastEnum):
     EXCEPTION = "exception"
     START = "start"
     TASK_GROUPS = "task_groups"
+    ELEMENT_TOTALS = "element_totals"
 
 
 # Notification()
@@ -91,7 +92,8 @@ class Notification():
                  message=None,
                  task_error=None,
                  exception=None,
-                 task_groups=None):
+                 task_groups=None,
+                 element_totals=None):
         self.notification_type = notification_type
         self.full_name = full_name
         self.job_action = job_action
@@ -102,6 +104,7 @@ class Notification():
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
         self.task_groups = task_groups
+        self.element_totals = element_totals
 
 
 # Scheduler()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index b28e40f..a2b9b8f 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -72,6 +72,8 @@ class Stream():
         self.session_elements = []   # List of elements being processed this session
         self.total_elements = []     # Total list of elements based on targets
         self.queues = []             # Queue objects
+        self.len_session_elements = None
+        self.len_total_elements = None
 
         #
         # Private members
@@ -82,7 +84,6 @@ class Stream():
         self._project = None
         self._pipeline = None
         self._state = State(session_start)  # Owned by Stream, used by Core to set state
-        #self._notification_pipe_front, self._notification_pipe_back = mp.Pipe()
         self._subprocess = None
         self._starttime = session_start  # Synchronised with Scheduler's relative start time
 
@@ -127,13 +128,13 @@ class Stream():
 
         mp_context = mp.get_context(method='fork')
         process_name = "stream-{}".format(func.__name__)
-        
+
         self._notify_front = mp.Queue()
         self._notify_back = mp.Queue()
         # Tell the scheduler to not use the notifier callback
         self._scheduler._notify_front = self._notify_front
         self._scheduler._notify_back = self._notify_back
-        
+
         args = list(args)
         args.insert(0, self._notify_front)
         args.insert(0, func)
@@ -1434,6 +1435,14 @@ class Stream():
             else:
                 self._session_start_callback()
 
+        # Also send through the session & total elements list lengths for status rendering
+        element_totals = str(len(self.session_elements)), str(len(self.total_elements))
+        if self._notify_front:
+            self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS,
+                                                element_totals=element_totals))
+        else:
+            self.len_session_elements, self.len_total_elements = element_totals
+
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
 
         if status == SchedStatus.ERROR:
@@ -1758,6 +1767,8 @@ class Stream():
             raise SubprocessException(**notification.exception)
         elif notification.notification_type == NotificationType.START:
             self._session_start_callback()
+        elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
+            self.len_session_elements, self.len_total_elements = notification.element_totals
         else:
             raise StreamError("Unrecognised notification type received")
 
@@ -1779,7 +1790,7 @@ class Stream():
             except queue.Empty:
                 notification = None
                 break
-    
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and


[buildstream] 10/16: basic async in stream

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 2fe0c6a939bee0c205dcc650210c4905c07f1902
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Oct 11 10:45:58 2019 +0100

    basic async in stream
---
 src/buildstream/_context.py             |   3 +
 src/buildstream/_exceptions.py          |   8 +++
 src/buildstream/_frontend/app.py        |  53 ++++++++-------
 src/buildstream/_frontend/status.py     |   1 +
 src/buildstream/_scheduler/scheduler.py |  22 +++---
 src/buildstream/_stream.py              | 115 +++++++++++++++++++++-----------
 6 files changed, 130 insertions(+), 72 deletions(-)

diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index 8795550..54f483f 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -175,6 +175,9 @@ class Context():
         self._workspace_project_cache = WorkspaceProjectCache()
         self._cascache = None
 
+        # An exception caught from subprocessing, used to handle run exceptions in App
+        self._subprocess_exception = None
+
     # __enter__()
     #
     # Called when entering the with-statement context.
diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index a6f726b..1468381 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -43,6 +43,14 @@ def get_last_exception():
     _last_exception = None
     return le
 
+# set_last_exception()
+#
+# Sets the last exception from the main process, used if Stream is running a subprocess
+#
+def set_last_exception(exception):
+    if 'BST_TEST_SUITE' in os.environ:
+        global _last_exception
+        _last_exception = exception
 
 # get_last_task_error()
 #
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 4fd632d..c021602 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -285,38 +285,23 @@ class App():
             try:
                 yield
             except BstError as e:
+                self._handle_run_exception(e, session_name)
 
-                # Print a nice summary if this is a session
-                if session_name:
-                    elapsed = self.stream.elapsed_time
-
-                    if isinstance(e, StreamError) and e.terminated:  # pylint: disable=no-member
-                        self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
-                    else:
-                        self._message(MessageType.FAIL, session_name, elapsed=elapsed)
-
-                        # Notify session failure
-                        self._notify("{} failed".format(session_name), e)
-
-                    if self._started:
-                        self._print_summary()
-
-                # Exit with the error
-                self._error_exit(e)
             except RecursionError:
                 click.echo("RecursionError: Dependency depth is too large. Maximum recursion depth exceeded.",
                            err=True)
                 sys.exit(-1)
 
-            else:
+            if self.context._subprocess_exception:
+                # If an exception was thrown in a Stream subprocessed asyncio method, handle it
+                self._handle_run_exception(self.context._subprocess_exception, session_name)
+            elif session_name:
                 # No exceptions occurred, print session time and summary
-                if session_name:
-                    self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
-                    if self._started:
-                        self._print_summary()
-
-                    # Notify session success
-                    self._notify("{} succeeded".format(session_name), "")
+                self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+                if self._started:
+                    self._print_summary()
+                # Notify session success
+                self._notify("{} succeeded".format(session_name), "")
 
     # init_project()
     #
@@ -891,6 +876,24 @@ class App():
 
         return (project_name, format_version, element_path)
 
+    def _handle_run_exception(self, exception, session_name):
+        # Print a nice summary if this is a session
+        if session_name:
+            elapsed = self.stream.elapsed_time
+
+            if isinstance(exception, StreamError) and exception.terminated:  # pylint: disable=no-member
+                self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
+            else:
+                self._message(MessageType.FAIL, session_name, elapsed=elapsed)
+
+                # Notify session failure
+                self._notify("{} failed".format(session_name), exception)
+
+            if self._started:
+                self._print_summary()
+
+        self._error_exit(exception)
+
 
 #
 # Return a value processor for partial choice matching.
diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index c648e93..87f35fb 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -373,6 +373,7 @@ class _StatusHeader():
         #
         #  ========= 00:00:00 project-name (143/387) =========
         #
+
         session = self._stream.len_session_elements
         total = self._stream.len_total_elements
 
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index a0a87fb..854921b 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -69,6 +69,7 @@ class NotificationType(FastEnum):
     START = "start"
     TASK_GROUPS = "task_groups"
     ELEMENT_TOTALS = "element_totals"
+    FINISH = "finish"
 
 
 # Notification()
@@ -184,6 +185,9 @@ class Scheduler():
         # Hold on to the queues to process
         self.queues = queues
 
+        # Check if we're subprocessed
+        subprocessed = bool(self._notify_front_queue)
+
         # Ensure that we have a fresh new event loop, in case we want
         # to run another test in this thread.
         self.loop = asyncio.new_event_loop()
@@ -198,10 +202,11 @@ class Scheduler():
         # Handle unix signals while running
         self._connect_signals()
 
-        # Watch casd while running to ensure it doesn't die
-        self._casd_process = casd_process
-        _watcher = asyncio.get_child_watcher()
-        _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+        # If we're not in a subprocess, watch casd while running to ensure it doesn't die
+        if not subprocessed:
+            self._casd_process = casd_process
+            _watcher = asyncio.get_child_watcher()
+            _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
 
         # Add notification listener if in subprocess
         self._start_listening()
@@ -215,9 +220,10 @@ class Scheduler():
             self._stop_listening()
             self.loop.close()
 
-        # Stop watching casd
-        _watcher.remove_child_handler(casd_process.pid)
-        self._casd_process = None
+        # Stop watching casd if not subprocessed
+        if self._casd_process:
+            _watcher.remove_child_handler(casd_process.pid)
+            self._casd_process = None
 
         # Stop handling unix signals
         self._disconnect_signals()
@@ -236,7 +242,7 @@ class Scheduler():
             status = SchedStatus.SUCCESS
 
         # Send the state taskgroups if we're running under the subprocess
-        if self._notify_front_queue:
+        if subprocessed:
             # Don't pickle state
             for group in self._state.task_groups.values():
                 group._state = None
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index d034e77..8bae8ec 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -34,7 +34,7 @@ from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
 
 from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error, SubprocessException
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error, SubprocessException, set_last_exception
 from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
     SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus
@@ -72,8 +72,9 @@ class Stream():
         self.session_elements = []   # List of elements being processed this session
         self.total_elements = []     # Total list of elements based on targets
         self.queues = []             # Queue objects
-        self.len_session_elements = None
-        self.len_total_elements = None
+        self.len_session_elements = ''
+        self.len_total_elements = ''
+        self.loop = None
 
         #
         # Private members
@@ -101,6 +102,8 @@ class Stream():
         self._scheduler_suspended = False
         self._notify_front_queue = None
         self._notify_back_queue = None
+        self._casd_process = None
+        self._watcher = None
 
     # init()
     #
@@ -121,6 +124,7 @@ class Stream():
             # Send the exceptions members dict to be reraised in main process
             exception_attrs = vars(e)
             notify.put(Notification(NotificationType.EXCEPTION, exception=exception_attrs))
+        notify.put(Notification(NotificationType.FINISH))
 
     def run_in_subprocess(self, func, *args, **kwargs):
         assert not self._subprocess
@@ -143,24 +147,30 @@ class Stream():
 
         self._subprocess.start()
 
+        # We can now launch another async
+        self.loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(self.loop)
+        self._start_listening()
+        self.loop.set_exception_handler(self._handle_exception)
+        self._watch_casd()
+        self.loop.run_forever()
+
         # TODO connect signal handlers with asyncio
-        while self._subprocess.exitcode is None:
-            # check every given time interval on subprocess state
-            self._subprocess.join(0.01)
-            # if no exit code, go back to checking the message queue
-            self._loop()
 
+        # Scheduler has stopped running, so safe to still have async here
+        self._stop_listening()
+        self._stop_watching_casd()
+        self.loop.close()
+        self.loop = None
+        self._subprocess.join()
+        self._subprocess = None
         # Set main process back
         utils._reset_main_pid()
 
         # Ensure no more notifcations to process
-        try:
-            while True:
-                notification = self._notify_front_queue.get_nowait()
-                self._notification_handler(notification)
-        except queue.Empty:
-            pass
-
+        while not self._notify_front_queue.empty():
+            notification = self._notify_front_queue.get_nowait()
+            self._notification_handler(notification)
 
     # cleanup()
     #
@@ -171,7 +181,6 @@ class Stream():
         for q in [self._notify_back_queue, self._notify_front_queue]:
             if q is not None:
                 q.close()
-        #self._notification_queue.cancel_join_thread()
         if self._project:
             self._project.cleanup()
 
@@ -334,16 +343,17 @@ class Stream():
         if remote:
             use_config = False
 
+
         elements, track_elements = \
             self._load(targets, track_targets,
-                       selection=selection, track_selection=PipelineSelection.ALL,
-                       track_except_targets=track_except,
-                       track_cross_junctions=track_cross_junctions,
-                       ignore_junction_targets=ignore_junction_targets,
-                       use_artifact_config=use_config,
-                       artifact_remote_url=remote,
-                       use_source_config=True,
-                       dynamic_plan=True)
+                selection=selection, track_selection=PipelineSelection.ALL,
+                track_except_targets=track_except,
+                track_cross_junctions=track_cross_junctions,
+                ignore_junction_targets=ignore_junction_targets,
+                use_artifact_config=use_config,
+                artifact_remote_url=remote,
+                use_source_config=True,
+                dynamic_plan=True)
 
         # Remove the tracking elements from the main targets
         elements = self._pipeline.subtract_elements(elements, track_elements)
@@ -1435,15 +1445,15 @@ class Stream():
         #
         self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
 
-        if self._session_start_callback is not None:
-            self._notify_front(Notification(NotificationType.START))
-
         # Also send through the session & total elements list lengths for status rendering
         element_totals = str(len(self.session_elements)), str(len(self.total_elements))
         self._notify_front(Notification(NotificationType.ELEMENT_TOTALS,
                                         element_totals=element_totals))
 
 
+        if self._session_start_callback is not None:
+            self._notify_front(Notification(NotificationType.START))
+
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
 
         if status == SchedStatus.ERROR:
@@ -1764,12 +1774,14 @@ class Stream():
         elif notification.notification_type == NotificationType.TASK_ERROR:
             set_last_task_error(*notification.task_error)
         elif notification.notification_type == NotificationType.EXCEPTION:
-            # Regenerate the exception here, so we don't have to pickle it
             raise SubprocessException(**notification.exception)
         elif notification.notification_type == NotificationType.START:
             self._session_start_callback()
         elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
             self.len_session_elements, self.len_total_elements = notification.element_totals
+        elif notification.notification_type == NotificationType.FINISH:
+            if self.loop:
+                self.loop.stop()
         else:
             raise StreamError("Unrecognised notification type received")
 
@@ -1785,18 +1797,43 @@ class Stream():
         else:
             self._notification_handler(notification)
 
-    # The code to be run by the Stream's event loop while delegating
-    # work to a subprocess with the @subprocessed decorator
     def _loop(self):
-        assert self._notify_front_queue
-        # Check for and process new messages
-        while True:
-            try:
-                notification = self._notify_front_queue.get_nowait()
-                self._notification_handler(notification)
-            except queue.Empty:
-                notification = None
-                break
+        while not self._notify_front_queue.empty():
+            notification = self._notify_front_queue.get_nowait()
+            self._notification_handler(notification)
+
+    def _start_listening(self):
+        if self._notify_front_queue:
+            self.loop.add_reader(self._notify_front_queue._reader.fileno(), self._loop)
+
+    def _stop_listening(self):
+        if self._notify_front_queue:
+            self.loop.remove_reader(self._notify_front_queue._reader.fileno())
+
+    def _watch_casd(self):
+        if self._context.get_cascache()._casd_process:
+            self._casd_process = self._context.get_cascache().get_casd_process()
+            self._watcher = asyncio.get_child_watcher()
+            self._watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure)
+
+    def _abort_on_casd_failure(self, pid, returncode):
+        message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
+        self._notify_front(Notification(NotificationType.MESSAGE, message=message))
+        self._casd_process.returncode = returncode
+        notification = Notification(NotificationType.TERMINATE)
+        self._notify_back(notification)
+
+    def _stop_watching_casd(self):
+        self._watcher.remove_child_handler(self._casd_process.pid)
+        self._casd_process = None
+
+    def _handle_exception(self, loop, context):
+        exception = context.get('exception')
+        # Set the last exception for the test suite if needed
+        set_last_exception(exception)
+        # Add it to context
+        self._context._subprocess_exception = exception
+        self.loop.stop()
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing


[buildstream] 12/16: Add support for dynamic queue status reporting to frontend State()

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 72c0d08cfa18f6cfff250626a62f62a1745bfe85
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Oct 24 14:23:55 2019 +0100

    Add support for dynamic queue status reporting to frontend State()
---
 src/buildstream/_scheduler/queues/queue.py |  2 ++
 src/buildstream/_scheduler/scheduler.py    | 15 ++++-----
 src/buildstream/_state.py                  | 53 ++++++++++++++++++++++++------
 src/buildstream/_stream.py                 | 12 ++++++-
 4 files changed, 62 insertions(+), 20 deletions(-)

diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 664741b..e31259b 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -86,6 +86,8 @@ class Queue():
             self._max_retries = scheduler.context.sched_network_retries
 
         self._task_group = self._scheduler._state.add_task_group(self.action_name, self.complete_name)
+        self._scheduler._state.register_task_groups_changed_callback(self._scheduler._update_task_groups,
+                                                                     name=self.action_name)
 
     # destroy()
     #
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 74a688f..2b9f9e6 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -105,7 +105,7 @@ class Notification():
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
-        self.task_groups = task_groups
+        self.task_groups = task_groups # Tuple of queue name, complete name, task change, & optional element name
         self.element_totals = element_totals
 
 
@@ -242,14 +242,6 @@ class Scheduler():
         else:
             status = SchedStatus.SUCCESS
 
-        # Send the state taskgroups if we're running under the subprocess
-        if subprocessed:
-            # Don't pickle state
-            for group in self._state.task_groups.values():
-                group._state = None
-            notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
-            self._notify_front_queue.put(notification)
-
         return status
 
     # clear_queues()
@@ -650,6 +642,11 @@ class Scheduler():
         if self._notify_back_queue:
             self.loop.remove_reader(self._notify_back_queue._reader.fileno())
 
+    def _update_task_groups(self, name, complete_name, task, full_name=None):
+        if self._notify_front_queue:
+            changes = (name, complete_name, task, full_name)
+            self._notify_front(Notification(NotificationType.TASK_GROUPS, task_groups=changes))
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index 310e12a..f35f07a 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -54,8 +54,10 @@ class TaskGroup():
     #
     def add_processed_task(self):
         self.processed_tasks += 1
-        for cb in self._state._task_groups_changed_cbs:
-            cb()
+        for cb, name in self._state._task_groups_changed_cbs:
+            # If name matches group, or if name not given call the cb
+            if name == self.name or name is None:
+                cb(name, self.complete_name, 'processed_tasks')
 
     # add_skipped_task()
     #
@@ -65,9 +67,10 @@ class TaskGroup():
     #
     def add_skipped_task(self):
         self.skipped_tasks += 1
-
-        for cb in self._state._task_groups_changed_cbs:
-            cb()
+        for cb, name in self._state._task_groups_changed_cbs:
+            # If name matches group, or if name not given call the cb
+            if name == self.name or name is None:
+                cb(name, self.complete_name, 'skipped_tasks')
 
     # add_failed_task()
     #
@@ -82,11 +85,10 @@ class TaskGroup():
     #
     def add_failed_task(self, full_name):
         self.failed_tasks.append(full_name)
-
-        for cb in self._state._task_groups_changed_cbs:
-            cb()
-
-
+        for cb, name in self._state._task_groups_changed_cbs:
+            # If name matches group, or if name not given call the cb
+            if name == self.name or name is None:
+                cb(name, self.complete_name, 'failed_tasks', full_name)
 # State
 #
 # The state data that is stored for the purpose of sharing with the frontend.
@@ -226,6 +228,37 @@ class State():
     def unregister_task_failed_callback(self, callback):
         self._task_failed_cbs.remove(callback)
 
+    # register_task_groups_changed_callback()
+    #
+    # Registers a callback to be notified when a task group has changed
+    #
+    # Args:
+    #    callback (function): The callback to be notified
+    #    name (str): Optional taskgroup related name, e.g. the action_name of a Queue. If None
+    #                given then the callback will be triggered for any task group changing.
+    #
+    # Callback Args:
+    #    name (str): The name of the task group, e.g. 'build'
+    #    complete_name (str): The complete name of the task group, e.g. 'built'
+    #    task(str): The full name of the task outcome, processed, skipped or failed.
+    #    element_name (str): Optional if an element task failed, the element name
+    #
+    def register_task_groups_changed_callback(self, callback, name=None):
+        self._task_groups_changed_cbs.append((callback, name))
+
+    # unregister_task_groups_changed_callback()
+    #
+    # Unregisters a callback previously registered by
+    # register_task_groups_changed_callback()
+    #
+    # Args:
+    #    callback (function): The callback to be removed
+    #    name (str): Optional taskgroup related name, e.g. the action_name of a Queue
+    #
+    def unregister_task_groups_changed_callback(self, callback, name=None):
+        self._task_groups_changed_cbs.remove((callback, name))
+
+
     ##############################################
     # Core-facing APIs for driving notifications #
     ##############################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 866cbd7..2e6e650 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1753,7 +1753,17 @@ class Stream():
 
     def _notification_handler(self, notification):
         if notification.notification_type == NotificationType.TASK_GROUPS:
-            self._state.task_groups = notification.task_groups
+            queue_name, complete_name, task_event, element_name = notification.task_groups
+            try:
+                group = self._state.task_groups[queue_name]
+            except KeyError:
+                # Queue not yet mirrored in front process, so create it & add it to status output
+                group = self._state.add_task_group(queue_name, complete_name)
+            if element_name is None:
+                count = getattr(group, task_event)
+                setattr(group, task_event, count + 1)
+            else:
+                getattr(group, task_event).append(element_name)
         elif notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT:


[buildstream] 15/16: Fixup sched notification to frontend

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit c75339041484e1334b6f56a66aa442667a898036
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Oct 29 13:06:55 2019 +0000

    Fixup sched notification to frontend
---
 src/buildstream/_scheduler/scheduler.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index cdf1d5a..3640d7b 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -455,7 +455,7 @@ class Scheduler():
         # Make sure fork is allowed before starting jobs
         if not self.context.prepare_fork():
             message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active")
-            self._notify(Notification(NotificationType.MESSAGE, message=message))
+            self._notify_front(Notification(NotificationType.MESSAGE, message=message))
             self.terminate_jobs()
             return
 


[buildstream] 03/16: Stop pickling exceptions, regen once off queue

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 02a8fb4778deec93b33d2766f0d4f01637321977
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Sep 25 11:36:01 2019 +0100

    Stop pickling exceptions, regen once off queue
---
 src/buildstream/_exceptions.py          | 12 ++++++++++++
 src/buildstream/_scheduler/scheduler.py |  2 --
 src/buildstream/_stream.py              | 19 ++++++++-----------
 3 files changed, 20 insertions(+), 13 deletions(-)

diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index 947b831..a6f726b 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -113,6 +113,8 @@ class BstError(Exception):
 
         super().__init__(message)
 
+        self.message = message
+
         # Additional error detail, these are used to construct detail
         # portions of the logging messages when encountered.
         #
@@ -378,3 +380,13 @@ class SkipJob(Exception):
 class ArtifactElementError(BstError):
     def __init__(self, message, *, detail=None, reason=None):
         super().__init__(message, detail=detail, domain=ErrorDomain.ELEMENT, reason=reason)
+
+class SubprocessException(BstError):
+    def __init__(self, **kwargs):
+        super().__init__(kwargs['message'], detail=kwargs['detail'],
+                         domain=kwargs['domain'], reason=kwargs['reason'], temporary=kwargs['temporary'])
+        self.sandbox = kwargs['sandbox']
+        try:
+            self.terminated = kwargs['terminated']
+        except KeyError:
+            pass
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 5a3da69..2d152b2 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -609,8 +609,6 @@ class Scheduler():
         elif notification.notification_type == NotificationType.RETRY:
             self._failure_retry(notification.job_action, notification.element)
         else:
-            # Do not raise exception once scheduler process is separated
-            # as we don't want to pickle exceptions between processes
             raise ValueError("Unrecognised notification type received")
 
     def _loop(self):
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 7cb3515..bc78f72 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -34,7 +34,7 @@ from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
 
 from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error, SubprocessException
 from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, \
     SourcePushQueue, BuildQueue, PullQueue, ArtifactPushQueue, NotificationType, Notification, JobStatus
@@ -117,12 +117,12 @@ class Stream():
         utils._reset_main_pid()
         try:
             func(*args, **kwargs)
-        except Exception as e:
-            notify.put(Notification(NotificationType.EXCEPTION, exception=e))
+        except BstError as e:
+            # Send the exceptions members dict to be reraised in main process
+            exception_attrs = vars(e)
+            notify.put(Notification(NotificationType.EXCEPTION, exception=exception_attrs))
 
     def run_in_subprocess(self, func, *args, **kwargs):
-        print("Args: {}".format([*args]))
-        print("Kwargs: {}".format(list(kwargs.items())))
         assert not self._subprocess
 
         mp_context = mp.get_context(method='fork')
@@ -137,7 +137,6 @@ class Stream():
         args = list(args)
         args.insert(0, self._notify_front)
         args.insert(0, func)
-        print("launching subprocess:", process_name)
 
         self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
                                               kwargs=kwargs, name=process_name)
@@ -150,7 +149,6 @@ class Stream():
             self._subprocess.join(0.01)
             # if no exit code, go back to checking the message queue
             self._loop()
-        print("Stopping loop...")
 
         # Set main process back
         utils._reset_main_pid()
@@ -161,9 +159,9 @@ class Stream():
                 notification = self._notify_front.get_nowait()
                 self._scheduler_notification_handler(notification)
         except queue.Empty:
-            print("Finished processing notifications")
             pass
 
+
     # cleanup()
     #
     # Cleans up application state
@@ -1751,13 +1749,12 @@ class Stream():
         elif notification.notification_type == NotificationType.TASK_ERROR:
             set_last_task_error(*notification.task_error)
         elif notification.notification_type == NotificationType.EXCEPTION:
-            raise notification.exception
+            # Regenerate the exception here, so we don't have to pickle it
+            raise SubprocessException(**notification.exception)
         else:
             raise StreamError("Unrecognised notification type received")
 
     def _notify(self, notification):
-        # Set that the notifcation is for the scheduler
-        #notification.for_scheduler = True
         if self._notify_back:
             self._notify_back.put(notification)
         else: