You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:20:37 UTC

[buildstream] branch tpollard/temp created (now 577d29b)

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at 577d29b  Reset event loop in scheduler if subprocessed & pytest timeout

This branch includes the following new commits:

     new c3fc4e1  .gitlab-ci.yml: Track new version of freedesktop-sdk to fix overnigth test
     new f45ac38  scheduler.py: Notification for last_task_error propagation
     new 224df9a  Add in dual queue implementation for subprocess build.
     new cba58c7  Introduce tblib to handle subprocess exceptions
     new cf42285  Add notifications for session_start & task_groups
     new e92aa22  Explicitly ensure failed build sources are not pushed
     new 161aabd  Add len of session/total elements members to Stream
     new bf17415  Make it more verbose with front & back notifications
     new a4bafc1  Move sched notification poll to loop reader
     new 04fafcd  Failed shell to load via name if no plugintable state
     new b39e117  basic async in stream
     new 75e9d02  Add support for dynamic queue status reporting to frontend State()
     new 6a9db55  Add support for logger print header displaying pipeline output
     new 3cbddad  Fixup sched notification to frontend
     new 68521e9  Lint fixes
     new 45d8d40  Add some basic type hinting
     new f6ac2ac  Add profile topic for subprocessed stream method
     new 0089029  Apply AsyncioSafeProcess to Stream's multiprocess
     new 577d29b  Reset event loop in scheduler if subprocessed & pytest timeout

The 19 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 16/19: Add some basic type hinting

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 45d8d404ca2c65d5d52d1b5d176e8ebd8b66f279
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Nov 7 14:44:58 2019 +0000

    Add some basic type hinting
---
 src/buildstream/_exceptions.py          |  2 +-
 src/buildstream/_frontend/app.py        |  2 +-
 src/buildstream/_scheduler/scheduler.py | 14 +++++++-------
 src/buildstream/_stream.py              | 29 +++++++++++++++--------------
 4 files changed, 24 insertions(+), 23 deletions(-)

diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index 69acc69..01b641e 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -49,7 +49,7 @@ def get_last_exception():
 #
 # Sets the last exception from the main process, used if Stream is running a subprocess
 #
-def set_last_exception(exception):
+def set_last_exception(exception: Exception) -> None:
     if "BST_TEST_SUITE" in os.environ:
         global _last_exception
         _last_exception = exception
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 704a489..b69be1a 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -964,7 +964,7 @@ class App:
 
         return (project_name, format_version, element_path)
 
-    def _handle_run_exception(self, exception, session_name):
+    def _handle_run_exception(self, exception: BstError, session_name: str) -> None:
         # Print a nice summary if this is a session
         if session_name:
             elapsed = self.stream.elapsed_time
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index f0292f9..886867d 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -368,7 +368,7 @@ class Scheduler:
     #    domain (ErrorDomain): Enum for the domain from which the error occurred
     #    reason (str): String identifier representing the reason for the error
     #
-    def set_last_task_error(self, domain, reason):
+    def set_last_task_error(self, domain, reason: str) -> None:
         task_error = domain, reason
         notification = Notification(NotificationType.TASK_ERROR, task_error=task_error)
         self._notify_front(notification)
@@ -614,14 +614,14 @@ class Scheduler:
         queue._task_group.failed_tasks.remove(element._get_full_name())
         queue.enqueue([element])
 
-    def _notify_front(self, notification):
+    def _notify_front(self, notification: Notification) -> None:
         # Check if we need to call the notifier callback
         if self._notify_front_queue:
             self._notify_front_queue.put(notification)
         else:
             self._notifier(notification)
 
-    def _notification_handler(self, notification):
+    def _notification_handler(self, notification: Notification) -> None:
         if notification.notification_type == NotificationType.TERMINATE:
             self.terminate_jobs()
         elif notification.notification_type == NotificationType.QUIT:
@@ -639,20 +639,20 @@ class Scheduler:
             # as we don't want to pickle exceptions between processes
             raise ValueError("Unrecognised notification type received")
 
-    def _loop(self):
+    def _loop(self) -> None:
         while not self._notify_back_queue.empty():
             notification = self._notify_back_queue.get_nowait()
             self._notification_handler(notification)
 
-    def _start_listening(self):
+    def _start_listening(self) -> None:
         if self._notify_back_queue:
             self.loop.add_reader(self._notify_back_queue._reader.fileno(), self._loop)
 
-    def _stop_listening(self):
+    def _stop_listening(self) -> None:
         if self._notify_back_queue:
             self.loop.remove_reader(self._notify_back_queue._reader.fileno())
 
-    def _update_task_groups(self, name, complete_name, task, full_name=None):
+    def _update_task_groups(self, name: str, complete_name: str, task: str, full_name: str = None) -> None:
         if self._notify_front_queue:
             changes = (name, complete_name, task, full_name)
             self._notify_front(Notification(NotificationType.TASK_GROUPS, task_groups=changes))
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 4dbdebb..d640769 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -133,7 +133,7 @@ class Stream:
         self._sourcecache = self._context.sourcecache
 
     @staticmethod
-    def _subprocess_main(func, notify, *args, **kwargs):
+    def _subprocess_main(func, notify, *args, **kwargs) -> None:
         # Set main process
         utils._set_stream_pid()
 
@@ -1783,7 +1783,7 @@ class Stream:
 
         return element_targets, artifact_refs
 
-    def _notification_handler(self, notification):
+    def _notification_handler(self, notification: Notification) -> None:
         if notification.notification_type == NotificationType.TASK_GROUPS:
             queue_name, complete_name, task_event, element_name = notification.task_groups
             try:
@@ -1833,59 +1833,60 @@ class Stream:
         else:
             raise StreamError("Unrecognised notification type received")
 
-    def _notify_back(self, notification):
+    def _notify_back(self, notification: Notification) -> None:
         if self._notify_back_queue:
             self._notify_back_queue.put(notification)
         else:
             self._scheduler._notification_handler(notification)
 
-    def _notify_front(self, notification):
+    def _notify_front(self, notification: Notification) -> None:
         if self._notify_front_queue:
             self._notify_front_queue.put(notification)
         else:
             self._notification_handler(notification)
 
-    def _loop(self):
+    def _loop(self) -> None:
         while not self._notify_front_queue.empty():
             notification = self._notify_front_queue.get_nowait()
             self._notification_handler(notification)
 
-    def _start_listening(self):
+    def _start_listening(self) -> None:
         if self._notify_front_queue:
             self.loop.add_reader(self._notify_front_queue._reader.fileno(), self._loop)
 
-    def _stop_listening(self):
+    def _stop_listening(self) -> None:
         if self._notify_front_queue:
             self.loop.remove_reader(self._notify_front_queue._reader.fileno())
 
-    def _watch_casd(self):
+    def _watch_casd(self) -> None:
         if self._context.get_cascache()._casd_process:
             self._casd_process = self._context.get_cascache().get_casd_process()
             self._watcher = asyncio.get_child_watcher()
             self._watcher.attach_loop(self.loop)
             self._watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure)
 
-    def _abort_on_casd_failure(self, pid, returncode):
+    def _abort_on_casd_failure(self, pid: int, returncode: int) -> None:
         message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
         self._notify_front(Notification(NotificationType.MESSAGE, message=message))
         self._casd_process.returncode = returncode
         notification = Notification(NotificationType.TERMINATE)
         self._notify_back(notification)
 
-    def _stop_watching_casd(self):
+    def _stop_watching_casd(self) -> None:
         self._watcher.remove_child_handler(self._casd_process.pid)
         self._watcher.close()
         self._casd_process = None
 
-    def _handle_exception(self, loop, context):
+    def _handle_exception(self, loop, context: dict) -> None:
         exception = context.get("exception")
         # Set the last exception for the test suite if needed
-        set_last_exception(exception)
+        if exception:
+            set_last_exception(exception)
         # Add it to context
         self._context._subprocess_exception = exception
         self.loop.stop()
 
-    def _connect_signals(self):
+    def _connect_signals(self) -> None:
         if self.loop:
             self.loop.add_signal_handler(signal.SIGINT, self._interrupt_callback)
             self.loop.add_signal_handler(
@@ -1895,7 +1896,7 @@ class Stream:
                 signal.SIGTSTP, lambda: self._notify_back(Notification(NotificationType.SIGTSTP))
             )
 
-    def _disconnect_signals(self):
+    def _disconnect_signals(self) -> None:
         if self.loop:
             self.loop.remove_signal_handler(signal.SIGINT)
             self.loop.remove_signal_handler(signal.SIGTSTP)


[buildstream] 07/19: Add len of session/total elements members to Stream

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 161aabd94f020c6f90ec123b54e90b90116a13ab
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Sep 27 14:51:53 2019 +0100

    Add len of session/total elements members to Stream
---
 src/buildstream/_frontend/status.py     |  4 ++--
 src/buildstream/_frontend/widget.py     |  4 ++--
 src/buildstream/_scheduler/scheduler.py |  5 ++++-
 src/buildstream/_stream.py              | 12 +++++++++++-
 4 files changed, 19 insertions(+), 6 deletions(-)

diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index a3f0d8a..d3132fe 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -357,8 +357,8 @@ class _StatusHeader:
         #
         #  ========= 00:00:00 project-name (143/387) =========
         #
-        session = str(len(self._stream.session_elements))
-        total = str(len(self._stream.total_elements))
+        session = self._stream.len_session_elements
+        total = self._stream.len_total_elements
 
         size = 0
         text = ""
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 63fbfbb..7c846bc 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -542,8 +542,8 @@ class LogLine(Widget):
         text += self.content_profile.fmt("Pipeline Summary\n", bold=True)
         values = OrderedDict()
 
-        values["Total"] = self.content_profile.fmt(str(len(stream.total_elements)))
-        values["Session"] = self.content_profile.fmt(str(len(stream.session_elements)))
+        values["Total"] = self.content_profile.fmt(stream.len_total_elements)
+        values["Session"] = self.content_profile.fmt(stream.len_session_elements)
 
         processed_maxlen = 1
         skipped_maxlen = 1
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 07263df..6b5f306 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -71,6 +71,7 @@ class NotificationType(FastEnum):
     EXCEPTION = "exception"
     START = "start"
     TASK_GROUPS = "task_groups"
+    ELEMENT_TOTALS = "element_totals"
 
 
 # Notification()
@@ -94,7 +95,8 @@ class Notification:
         message=None,
         task_error=None,
         exception=None,
-        task_groups=None
+        task_groups=None,
+        element_totals=None
     ):
         self.notification_type = notification_type
         self.full_name = full_name
@@ -106,6 +108,7 @@ class Notification:
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
         self.task_groups = task_groups
+        self.element_totals = element_totals
 
 
 # Scheduler()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 7bb2f5d..841ccde 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -89,6 +89,8 @@ class Stream:
         self.session_elements = []  # List of elements being processed this session
         self.total_elements = []  # Total list of elements based on targets
         self.queues = []  # Queue objects
+        self.len_session_elements = None
+        self.len_total_elements = None
 
         #
         # Private members
@@ -99,7 +101,6 @@ class Stream:
         self._project = None
         self._pipeline = None
         self._state = State(session_start)  # Owned by Stream, used by Core to set state
-        # self._notification_pipe_front, self._notification_pipe_back = mp.Pipe()
         self._subprocess = None
         self._starttime = session_start  # Synchronised with Scheduler's relative start time
 
@@ -1453,6 +1454,13 @@ class Stream:
             else:
                 self._session_start_callback()
 
+        # Also send through the session & total elements list lengths for status rendering
+        element_totals = str(len(self.session_elements)), str(len(self.total_elements))
+        if self._notify_front:
+            self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals))
+        else:
+            self.len_session_elements, self.len_total_elements = element_totals
+
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
 
         if status == SchedStatus.ERROR:
@@ -1771,6 +1779,8 @@ class Stream:
             raise notification.exception.re_raise()
         elif notification.notification_type == NotificationType.START:
             self._session_start_callback()
+        elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
+            self.len_session_elements, self.len_total_elements = notification.element_totals
         else:
             raise StreamError("Unrecognised notification type received")
 


[buildstream] 12/19: Add support for dynamic queue status reporting to frontend State()

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 75e9d023ee424f4fb24e83908e199db891c54906
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Oct 24 14:23:55 2019 +0100

    Add support for dynamic queue status reporting to frontend State()
---
 src/buildstream/_scheduler/queues/queue.py |  3 ++
 src/buildstream/_scheduler/scheduler.py    | 15 ++++-----
 src/buildstream/_state.py                  | 50 +++++++++++++++++++++++++-----
 src/buildstream/_stream.py                 | 12 ++++++-
 4 files changed, 62 insertions(+), 18 deletions(-)

diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 79cb162..77a9157 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -86,6 +86,9 @@ class Queue:
             self._max_retries = scheduler.context.sched_network_retries
 
         self._task_group = self._scheduler._state.add_task_group(self.action_name, self.complete_name)
+        self._scheduler._state.register_task_groups_changed_callback(
+            self._scheduler._update_task_groups, name=self.action_name
+        )
 
     # destroy()
     #
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index fbe2599..ac52b0e 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -109,7 +109,7 @@ class Notification:
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
-        self.task_groups = task_groups
+        self.task_groups = task_groups  # Tuple of queue name, complete name, task change, & optional element name
         self.element_totals = element_totals
 
 
@@ -243,14 +243,6 @@ class Scheduler:
         else:
             status = SchedStatus.SUCCESS
 
-        # Send the state taskgroups if we're running under the subprocess
-        if subprocessed:
-            # Don't pickle state
-            for group in self._state.task_groups.values():
-                group._state = None
-            notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
-            self._notify_front_queue.put(notification)
-
         return status
 
     # clear_queues()
@@ -658,6 +650,11 @@ class Scheduler:
         if self._notify_back_queue:
             self.loop.remove_reader(self._notify_back_queue._reader.fileno())
 
+    def _update_task_groups(self, name, complete_name, task, full_name=None):
+        if self._notify_front_queue:
+            changes = (name, complete_name, task, full_name)
+            self._notify_front(Notification(NotificationType.TASK_GROUPS, task_groups=changes))
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_state.py b/src/buildstream/_state.py
index d85e348..c0c29e9 100644
--- a/src/buildstream/_state.py
+++ b/src/buildstream/_state.py
@@ -54,8 +54,10 @@ class TaskGroup:
     #
     def add_processed_task(self):
         self.processed_tasks += 1
-        for cb in self._state._task_groups_changed_cbs:
-            cb()
+        for cb, name in self._state._task_groups_changed_cbs:
+            # If name matches group, or if name not given call the cb
+            if name == self.name or name is None:
+                cb(name, self.complete_name, "processed_tasks")
 
     # add_skipped_task()
     #
@@ -65,9 +67,10 @@ class TaskGroup:
     #
     def add_skipped_task(self):
         self.skipped_tasks += 1
-
-        for cb in self._state._task_groups_changed_cbs:
-            cb()
+        for cb, name in self._state._task_groups_changed_cbs:
+            # If name matches group, or if name not given call the cb
+            if name == self.name or name is None:
+                cb(name, self.complete_name, "skipped_tasks")
 
     # add_failed_task()
     #
@@ -82,9 +85,10 @@ class TaskGroup:
     #
     def add_failed_task(self, full_name):
         self.failed_tasks.append(full_name)
-
-        for cb in self._state._task_groups_changed_cbs:
-            cb()
+        for cb, name in self._state._task_groups_changed_cbs:
+            # If name matches group, or if name not given call the cb
+            if name == self.name or name is None:
+                cb(name, self.complete_name, "failed_tasks", full_name)
 
 
 # State
@@ -226,6 +230,36 @@ class State:
     def unregister_task_failed_callback(self, callback):
         self._task_failed_cbs.remove(callback)
 
+    # register_task_groups_changed_callback()
+    #
+    # Registers a callback to be notified when a task group has changed
+    #
+    # Args:
+    #    callback (function): The callback to be notified
+    #    name (str): Optional taskgroup related name, e.g. the action_name of a Queue. If None
+    #                given then the callback will be triggered for any task group changing.
+    #
+    # Callback Args:
+    #    name (str): The name of the task group, e.g. 'build'
+    #    complete_name (str): The complete name of the task group, e.g. 'built'
+    #    task(str): The full name of the task outcome, processed, skipped or failed.
+    #    element_name (str): Optional if an element task failed, the element name
+    #
+    def register_task_groups_changed_callback(self, callback, name=None):
+        self._task_groups_changed_cbs.append((callback, name))
+
+    # unregister_task_groups_changed_callback()
+    #
+    # Unregisters a callback previously registered by
+    # register_task_groups_changed_callback()
+    #
+    # Args:
+    #    callback (function): The callback to be removed
+    #    name (str): Optional taskgroup related name, e.g. the action_name of a Queue
+    #
+    def unregister_task_groups_changed_callback(self, callback, name=None):
+        self._task_groups_changed_cbs.remove((callback, name))
+
     ##############################################
     # Core-facing APIs for driving notifications #
     ##############################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 4ac73e6..789e14a 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1778,7 +1778,17 @@ class Stream:
 
     def _notification_handler(self, notification):
         if notification.notification_type == NotificationType.TASK_GROUPS:
-            self._state.task_groups = notification.task_groups
+            queue_name, complete_name, task_event, element_name = notification.task_groups
+            try:
+                group = self._state.task_groups[queue_name]
+            except KeyError:
+                # Queue not yet mirrored in front process, so create it & add it to status output
+                group = self._state.add_task_group(queue_name, complete_name)
+            if element_name is None:
+                count = getattr(group, task_event)
+                setattr(group, task_event, count + 1)
+            else:
+                getattr(group, task_event).append(element_name)
         elif notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT:


[buildstream] 05/19: Add notifications for session_start & task_groups

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit cf42285881dc7e75ff5278fc9ccc71e93c97dd80
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Sep 26 11:50:15 2019 +0100

    Add notifications for session_start & task_groups
---
 src/buildstream/_scheduler/scheduler.py | 15 +++++++++++++--
 src/buildstream/_stream.py              | 11 +++++++++--
 2 files changed, 22 insertions(+), 4 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 8a2dbf6..07263df 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -69,6 +69,8 @@ class NotificationType(FastEnum):
     MESSAGE = "message"
     TASK_ERROR = "task_error"
     EXCEPTION = "exception"
+    START = "start"
+    TASK_GROUPS = "task_groups"
 
 
 # Notification()
@@ -91,8 +93,8 @@ class Notification:
         element=None,
         message=None,
         task_error=None,
-        for_scheduler=None,
-        exception=None
+        exception=None,
+        task_groups=None
     ):
         self.notification_type = notification_type
         self.full_name = full_name
@@ -103,6 +105,7 @@ class Notification:
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
         self.exception = exception
+        self.task_groups = task_groups
 
 
 # Scheduler()
@@ -228,6 +231,14 @@ class Scheduler:
         else:
             status = SchedStatus.SUCCESS
 
+        # Send the state taskgroups if we're running under the subprocess
+        if self._notify_front:
+            # Don't pickle state
+            for group in self._state.task_groups.values():
+                group._state = None
+            notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
+            self._notify_front.put(notification)
+
         return status
 
     # clear_queues()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 1b4ba0e..7bb2f5d 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1448,7 +1448,10 @@ class Stream:
         self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
 
         if self._session_start_callback is not None:
-            self._session_start_callback()
+            if self._notify_front:
+                self._notify_front.put(Notification(NotificationType.START))
+            else:
+                self._session_start_callback()
 
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
 
@@ -1739,7 +1742,9 @@ class Stream:
         return element_targets, artifact_refs
 
     def _scheduler_notification_handler(self, notification):
-        if notification.notification_type == NotificationType.MESSAGE:
+        if notification.notification_type == NotificationType.TASK_GROUPS:
+            self._state.task_groups = notification.task_groups
+        elif notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT:
             self._interrupt_callback()
@@ -1764,6 +1769,8 @@ class Stream:
             set_last_task_error(*notification.task_error)
         elif notification.notification_type == NotificationType.EXCEPTION:
             raise notification.exception.re_raise()
+        elif notification.notification_type == NotificationType.START:
+            self._session_start_callback()
         else:
             raise StreamError("Unrecognised notification type received")
 


[buildstream] 06/19: Explicitly ensure failed build sources are not pushed

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit e92aa22e52719d5e5f649f8bf895d96584e196bf
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Sep 26 15:04:07 2019 +0100

    Explicitly ensure failed build sources are not pushed
---
 src/buildstream/element.py | 3 ++-
 tests/sourcecache/push.py  | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 97d9294..de15ef0 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1918,7 +1918,8 @@ class Element(Plugin):
         return True
 
     def _skip_source_push(self):
-        if not self.__sources or self._get_workspace():
+        # Skip push if we have no sources, are workspaced or the given element failed to build
+        if not self.__sources or self._get_workspace() or not self._get_build_result()[0]:
             return True
         return not (self.__sourcecache.has_push_remotes(plugin=self) and self._source_cached())
 
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index 7198604..551593e 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -254,5 +254,5 @@ def test_source_push_build_fail(cli, tmpdir, datafiles):
         res.assert_task_error(ErrorDomain.ELEMENT, None)
 
         # Sources are not pushed as the build queue is before the source push
-        # queue.
+        # queue. We explicitly don't want to push failed build source by default.
         assert "Pushed source " not in res.stderr


[buildstream] 11/19: basic async in stream

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit b39e1170c102d36601ea853837e451841f091942
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Oct 11 10:45:58 2019 +0100

    basic async in stream
---
 src/buildstream/_context.py             |   3 +
 src/buildstream/_exceptions.py          |  10 +++
 src/buildstream/_frontend/app.py        |  57 ++++++------
 src/buildstream/_frontend/status.py     |   1 +
 src/buildstream/_scheduler/scheduler.py |  54 ++++++++----
 src/buildstream/_stream.py              | 148 ++++++++++++++++++++++++--------
 6 files changed, 196 insertions(+), 77 deletions(-)

diff --git a/src/buildstream/_context.py b/src/buildstream/_context.py
index f426f4b..d7a4437 100644
--- a/src/buildstream/_context.py
+++ b/src/buildstream/_context.py
@@ -174,6 +174,9 @@ class Context:
         self._workspace_project_cache = WorkspaceProjectCache()
         self._cascache = None
 
+        # An exception caught from subprocessing, used to handle run exceptions in App
+        self._subprocess_exception = None
+
     # __enter__()
     #
     # Called when entering the with-statement context.
diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index 85fcf61..69acc69 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -45,6 +45,16 @@ def get_last_exception():
     return le
 
 
+# set_last_exception()
+#
+# Sets the last exception from the main process, used if Stream is running a subprocess
+#
+def set_last_exception(exception):
+    if "BST_TEST_SUITE" in os.environ:
+        global _last_exception
+        _last_exception = exception
+
+
 # get_last_task_error()
 #
 # Fetches the last exception from a task
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 471901f..5fe38ce 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -300,39 +300,28 @@ class App:
             try:
                 yield
             except BstError as e:
+                self._handle_run_exception(e, session_name)
 
-                # Print a nice summary if this is a session
-                if session_name:
-                    elapsed = self.stream.elapsed_time
-
-                    if isinstance(e, StreamError) and e.terminated:  # pylint: disable=no-member
-                        self._message(MessageType.WARN, session_name + " Terminated", elapsed=elapsed)
-                    else:
-                        self._message(MessageType.FAIL, session_name, elapsed=elapsed)
-
-                        # Notify session failure
-                        self._notify("{} failed".format(session_name), e)
-
-                    if self._started:
-                        self._print_summary()
-
-                # Exit with the error
-                self._error_exit(e)
             except RecursionError:
                 click.echo(
                     "RecursionError: Dependency depth is too large. Maximum recursion depth exceeded.", err=True
                 )
                 sys.exit(-1)
 
-            else:
+            if self.context._subprocess_exception:
+                # If a handled exception was thrown in a Stream subprocessed asyncio method, handle it
+                if isinstance(self.context._subprocess_exception, BstError):
+                    self._handle_run_exception(self.context._subprocess_exception, session_name)
+                else:
+                    # We don't gracefully handle non BstError() Excpetions
+                    raise self.context._subprocess_exception  # pylint: disable=raising-bad-type
+            elif session_name:
                 # No exceptions occurred, print session time and summary
-                if session_name:
-                    self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
-                    if self._started:
-                        self._print_summary()
-
-                    # Notify session success
-                    self._notify("{} succeeded".format(session_name), "")
+                self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+                if self._started:
+                    self._print_summary()
+                # Notify session success
+                self._notify("{} succeeded".format(session_name), "")
 
     # init_project()
     #
@@ -972,6 +961,24 @@ class App:
 
         return (project_name, format_version, element_path)
 
+    def _handle_run_exception(self, exception, session_name):
+        # Print a nice summary if this is a session
+        if session_name:
+            elapsed = self.stream.elapsed_time
+
+            if isinstance(exception, StreamError) and exception.terminated:  # pylint: disable=no-member
+                self._message(MessageType.WARN, session_name + " Terminated", elapsed=elapsed)
+            else:
+                self._message(MessageType.FAIL, session_name, elapsed=elapsed)
+
+                # Notify session failure
+                self._notify("{} failed".format(session_name), exception)
+
+            if self._started:
+                self._print_summary()
+
+        self._error_exit(exception)
+
 
 #
 # Return a value processor for partial choice matching.
diff --git a/src/buildstream/_frontend/status.py b/src/buildstream/_frontend/status.py
index d3132fe..f16e7d1 100644
--- a/src/buildstream/_frontend/status.py
+++ b/src/buildstream/_frontend/status.py
@@ -357,6 +357,7 @@ class _StatusHeader:
         #
         #  ========= 00:00:00 project-name (143/387) =========
         #
+
         session = self._stream.len_session_elements
         total = self._stream.len_total_elements
 
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 87853f0..fbe2599 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -72,6 +72,8 @@ class NotificationType(FastEnum):
     START = "start"
     TASK_GROUPS = "task_groups"
     ELEMENT_TOTALS = "element_totals"
+    FINISH = "finish"
+    SIGTSTP = "sigstp"
 
 
 # Notification()
@@ -184,6 +186,9 @@ class Scheduler:
         # Hold on to the queues to process
         self.queues = queues
 
+        # Check if we're subprocessed
+        subprocessed = bool(self._notify_front_queue)
+
         # Ensure that we have a fresh new event loop, in case we want
         # to run another test in this thread.
         self.loop = asyncio.new_event_loop()
@@ -198,10 +203,11 @@ class Scheduler:
         # Handle unix signals while running
         self._connect_signals()
 
-        # Watch casd while running to ensure it doesn't die
-        self._casd_process = casd_process
-        _watcher = asyncio.get_child_watcher()
-        _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
+        # If we're not in a subprocess, watch casd while running to ensure it doesn't die
+        if not subprocessed:
+            self._casd_process = casd_process
+            _watcher = asyncio.get_child_watcher()
+            _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
 
         # Add notification listener if in subprocess
         self._start_listening()
@@ -215,9 +221,11 @@ class Scheduler:
             self._stop_listening()
             self.loop.close()
 
-        # Stop watching casd
-        _watcher.remove_child_handler(casd_process.pid)
-        self._casd_process = None
+        # Stop watching casd if not subprocessed
+        if self._casd_process:
+            _watcher.remove_child_handler(casd_process.pid)
+            _watcher.close()
+            self._casd_process = None
 
         # Stop handling unix signals
         self._disconnect_signals()
@@ -236,7 +244,7 @@ class Scheduler:
             status = SchedStatus.SUCCESS
 
         # Send the state taskgroups if we're running under the subprocess
-        if self._notify_front_queue:
+        if subprocessed:
             # Don't pickle state
             for group in self._state.task_groups.values():
                 group._state = None
@@ -529,6 +537,8 @@ class Scheduler:
         if self.terminated:
             return
 
+        # This event handler is only set when not running in a subprocess, scheduler
+        # to handle keyboard interrupt
         notification = Notification(NotificationType.INTERRUPT)
         self._notify_front(notification)
 
@@ -558,17 +568,29 @@ class Scheduler:
 
     # _connect_signals():
     #
-    # Connects our signal handler event callbacks to the mainloop
+    # Connects our signal handler event callbacks to the mainloop. Signals
+    # only need to be connected if scheduler running in the 'main' process
     #
     def _connect_signals(self):
-        self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event)
-        self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event)
-        self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event)
+        if not self._notify_front_queue:
+            self.loop.add_signal_handler(signal.SIGINT, self._interrupt_event)
+            self.loop.add_signal_handler(signal.SIGTERM, self._terminate_event)
+            self.loop.add_signal_handler(signal.SIGTSTP, self._suspend_event)
 
+    # _disconnect_signals():
+    #
+    # Disconnects our signal handler event callbacks from the mainloop. Signals
+    # only need to be disconnected if scheduler running in the 'main' process
+    #
     def _disconnect_signals(self):
-        self.loop.remove_signal_handler(signal.SIGINT)
-        self.loop.remove_signal_handler(signal.SIGTSTP)
-        self.loop.remove_signal_handler(signal.SIGTERM)
+        if not self._notify_front_queue:
+            self.loop.remove_signal_handler(signal.SIGINT)
+            self.loop.remove_signal_handler(signal.SIGTSTP)
+            self.loop.remove_signal_handler(signal.SIGTERM)
+        else:
+            # If running in a subprocess, ignore SIGINT when disconnected
+            # under the interrupted click.prompt()
+            signal.signal(signal.SIGINT, signal.SIG_IGN)
 
     def _terminate_jobs_real(self):
         def kill_jobs():
@@ -616,6 +638,8 @@ class Scheduler:
             self.jobs_unsuspended()
         elif notification.notification_type == NotificationType.RETRY:
             self._failure_retry(notification.job_action, notification.element)
+        elif notification.notification_type == NotificationType.SIGTSTP:
+            self._suspend_event()
         else:
             # Do not raise exception once scheduler process is separated
             # as we don't want to pickle exceptions between processes
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 06643d9..4ac73e6 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -30,6 +30,7 @@ import shutil
 import tarfile
 import tempfile
 import queue
+import signal
 from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
 from tblib import pickling_support
@@ -43,6 +44,7 @@ from ._exceptions import (
     ArtifactError,
     set_last_task_error,
     SubprocessException,
+    set_last_exception,
 )
 from ._message import Message, MessageType
 from ._scheduler import (
@@ -63,7 +65,7 @@ from ._profile import Topics, PROFILER
 from ._state import State
 from .types import _KeyStrength, _SchedulerErrorAction
 from .plugin import Plugin
-from . import utils, _yaml, _site
+from . import utils, _yaml, _site, _signals
 from . import Scope, Consistency
 
 # Stream()
@@ -89,8 +91,9 @@ class Stream:
         self.session_elements = []  # List of elements being processed this session
         self.total_elements = []  # Total list of elements based on targets
         self.queues = []  # Queue objects
-        self.len_session_elements = None
-        self.len_total_elements = None
+        self.len_session_elements = ""
+        self.len_total_elements = ""
+        self.loop = None
 
         #
         # Private members
@@ -116,6 +119,8 @@ class Stream:
         self._scheduler_suspended = False
         self._notify_front_queue = None
         self._notify_back_queue = None
+        self._casd_process = None
+        self._watcher = None
 
     # init()
     #
@@ -133,10 +138,13 @@ class Stream:
 
         # Add traceback pickling support
         pickling_support.install()
-        try:
-            func(*args, **kwargs)
-        except Exception as e:  # pylint: disable=broad-except
-            notify.put(Notification(NotificationType.EXCEPTION, exception=SubprocessException(e)))
+        with _signals.blocked([signal.SIGINT, signal.SIGTERM, signal.SIGTSTP], ignore=True):
+            try:
+                func(*args, **kwargs)
+            except Exception as e:  # pylint: disable=broad-except
+                notify.put(Notification(NotificationType.EXCEPTION, exception=SubprocessException(e)))
+
+        notify.put(Notification(NotificationType.FINISH))
 
     def run_in_subprocess(self, func, *args, **kwargs):
         assert not self._subprocess
@@ -160,33 +168,48 @@ class Stream:
 
         self._subprocess.start()
 
-        # TODO connect signal handlers with asyncio
-        while self._subprocess.exitcode is None:
-            # check every given time interval on subprocess state
-            self._subprocess.join(0.01)
-            # if no exit code, go back to checking the message queue
-            self._loop()
-        print("Stopping loop...")
+        # We can now launch another async
+        self.loop = asyncio.new_event_loop()
+        self._connect_signals()
+        self._start_listening()
+        self.loop.set_exception_handler(self._handle_exception)
+        self._watch_casd()
+        self.loop.run_forever()
+
+        # Scheduler has stopped running, so safe to still have async here
+        self._stop_listening()
+        self._stop_watching_casd()
+        self.loop.close()
+        self._disconnect_signals()
+        self.loop = None
+        self._subprocess.join()
+        self._subprocess = None
 
         # Ensure no more notifcations to process
-        try:
-            while True:
-                notification = self._notify_front_queue.get_nowait()
-                self._notification_handler(notification)
-        except queue.Empty:
-            print("Finished processing notifications")
-            pass
+        while not self._notify_front_queue.empty():
+            notification = self._notify_front_queue.get_nowait()
+            self._notification_handler(notification)
 
     # cleanup()
     #
     # Cleans up application state
     #
     def cleanup(self):
-        # Close the notification queue
+        # Close the notification queues
         for q in [self._notify_back_queue, self._notify_front_queue]:
             if q is not None:
                 q.close()
-        # self._notification_queue.cancel_join_thread()
+                q.join_thread()
+                q = None
+
+        # Close loop
+        if self.loop is not None:
+            self.loop.close()
+            self.loop = None
+
+        # Ensure global event loop policy is unset
+        asyncio.set_event_loop_policy(None)
+
         if self._project:
             self._project.cleanup()
 
@@ -1249,10 +1272,14 @@ class Stream:
         # Send the notification to suspend jobs
         notification = Notification(NotificationType.SUSPEND)
         self._notify_back(notification)
+        # Disconnect signals if stream is handling them
+        self._disconnect_signals()
         yield
         # Unsuspend jobs on context exit
         notification = Notification(NotificationType.UNSUSPEND)
         self._notify_back(notification)
+        # Connect signals if stream is handling them
+        self._connect_signals()
 
     #############################################################
     #                    Private Methods                        #
@@ -1454,13 +1481,13 @@ class Stream:
         #
         self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
 
-        if self._session_start_callback is not None:
-            self._notify_front(Notification(NotificationType.START))
-
         # Also send through the session & total elements list lengths for status rendering
         element_totals = str(len(self.session_elements)), str(len(self.total_elements))
         self._notify_front(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals))
 
+        if self._session_start_callback is not None:
+            self._notify_front(Notification(NotificationType.START))
+
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
 
         if status == SchedStatus.ERROR:
@@ -1781,6 +1808,9 @@ class Stream:
             self._session_start_callback()
         elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
             self.len_session_elements, self.len_total_elements = notification.element_totals
+        elif notification.notification_type == NotificationType.FINISH:
+            if self.loop:
+                self.loop.stop()
         else:
             raise StreamError("Unrecognised notification type received")
 
@@ -1796,18 +1826,62 @@ class Stream:
         else:
             self._notification_handler(notification)
 
-    # The code to be run by the Stream's event loop while delegating
-    # work to a subprocess with the @subprocessed decorator
     def _loop(self):
-        assert self._notify_front_queue
-        # Check for and process new messages
-        while True:
-            try:
-                notification = self._notify_front_queue.get_nowait()
-                self._notification_handler(notification)
-            except queue.Empty:
-                notification = None
-                break
+        while not self._notify_front_queue.empty():
+            notification = self._notify_front_queue.get_nowait()
+            self._notification_handler(notification)
+
+    def _start_listening(self):
+        if self._notify_front_queue:
+            self.loop.add_reader(self._notify_front_queue._reader.fileno(), self._loop)
+
+    def _stop_listening(self):
+        if self._notify_front_queue:
+            self.loop.remove_reader(self._notify_front_queue._reader.fileno())
+
+    def _watch_casd(self):
+        if self._context.get_cascache()._casd_process:
+            self._casd_process = self._context.get_cascache().get_casd_process()
+            self._watcher = asyncio.get_child_watcher()
+            self._watcher.attach_loop(self.loop)
+            self._watcher.add_child_handler(self._casd_process.pid, self._abort_on_casd_failure)
+
+    def _abort_on_casd_failure(self, pid, returncode):
+        message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
+        self._notify_front(Notification(NotificationType.MESSAGE, message=message))
+        self._casd_process.returncode = returncode
+        notification = Notification(NotificationType.TERMINATE)
+        self._notify_back(notification)
+
+    def _stop_watching_casd(self):
+        self._watcher.remove_child_handler(self._casd_process.pid)
+        self._watcher.close()
+        self._casd_process = None
+
+    def _handle_exception(self, loop, context):
+        exception = context.get("exception")
+        # Set the last exception for the test suite if needed
+        set_last_exception(exception)
+        # Add it to context
+        self._context._subprocess_exception = exception
+        self.loop.stop()
+
+    def _connect_signals(self):
+        if self.loop:
+            self.loop.add_signal_handler(signal.SIGINT, self._interrupt_callback)
+            self.loop.add_signal_handler(
+                signal.SIGTERM, lambda: self._notify_back(Notification(NotificationType.TERMINATE))
+            )
+            self.loop.add_signal_handler(
+                signal.SIGTSTP, lambda: self._notify_back(Notification(NotificationType.SIGTSTP))
+            )
+
+    def _disconnect_signals(self):
+        if self.loop:
+            self.loop.remove_signal_handler(signal.SIGINT)
+            self.loop.remove_signal_handler(signal.SIGTSTP)
+            self.loop.remove_signal_handler(signal.SIGTERM)
+            signal.set_wakeup_fd(-1)
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing


[buildstream] 19/19: Reset event loop in scheduler if subprocessed & pytest timeout

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 577d29b1ea9913a71206ef27782a8d0b760144b0
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Nov 25 14:29:31 2019 +0000

    Reset event loop in scheduler if subprocessed & pytest timeout
---
 .gitlab-ci.yml                          | 2 +-
 src/buildstream/_scheduler/scheduler.py | 4 ++++
 src/buildstream/_stream.py              | 1 +
 3 files changed, 6 insertions(+), 1 deletion(-)

diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index f393be5..f3f8036 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -16,7 +16,7 @@ stages:
 variables:
   PYTEST_ADDOPTS: "--color=yes"
   INTEGRATION_CACHE: "${CI_PROJECT_DIR}/cache/integration-cache"
-  PYTEST_ARGS: "--color=yes --integration -n 2"
+  PYTEST_ARGS: "--color=yes --timeout=1 --integration -n 2"
   TEST_COMMAND: "tox -- ${PYTEST_ARGS}"
   EXTERNAL_TESTS_COMMAND: "tox -e py{35,36,37}-external -- ${PYTEST_ARGS}"
   COVERAGE_PREFIX: "${CI_JOB_NAME}."
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 886867d..6ee20bd 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -235,6 +235,10 @@ class Scheduler:
         failed = any(queue.any_failed_elements() for queue in self.queues)
         self.loop = None
 
+        # Unset the global event loop, if subprocessed
+        if subprocessed:
+            asyncio.set_event_loop_policy(None)
+
         # Notify that the loop has been reset
         self._notify_front(Notification(NotificationType.RUNNING))
 
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e62c255..9fdbcd9 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -172,6 +172,7 @@ class Stream:
 
         # We can now launch another async
         self.loop = asyncio.new_event_loop()
+        asyncio.set_event_loop(self.loop)
         self._connect_signals()
         self._start_listening()
         self.loop.set_exception_handler(self._handle_exception)


[buildstream] 18/19: Apply AsyncioSafeProcess to Stream's multiprocess

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 0089029945246b644b3113827db02fae8943d79f
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Nov 12 16:55:12 2019 +0000

    Apply AsyncioSafeProcess to Stream's multiprocess
    
    Note this stops explictly using the get_context object from
    multiprocessing which allows for fork to be used in a process
    where spawn is the default. This obviously breaks the linux CI
    targets for FORCE SPAWN.
---
 src/buildstream/{_scheduler => }/_multiprocessing.py | 0
 src/buildstream/_scheduler/jobs/job.py               | 3 +--
 src/buildstream/_stream.py                           | 6 +++---
 3 files changed, 4 insertions(+), 5 deletions(-)

diff --git a/src/buildstream/_scheduler/_multiprocessing.py b/src/buildstream/_multiprocessing.py
similarity index 100%
rename from src/buildstream/_scheduler/_multiprocessing.py
rename to src/buildstream/_multiprocessing.py
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 4cb80b8..8e90997 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -32,8 +32,7 @@ import traceback
 from ..._exceptions import ImplError, BstError, SkipJob
 from ..._message import Message, MessageType, unconditional_messages
 from ...types import FastEnum
-from ... import _signals, utils
-from .. import _multiprocessing
+from ... import _signals, utils, _multiprocessing
 
 from .jobpickler import pickle_child_job, do_pickled_child_job
 
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 653f150..e62c255 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -63,7 +63,7 @@ from ._profile import Topics, PROFILER
 from ._state import State
 from .types import _KeyStrength, _SchedulerErrorAction
 from .plugin import Plugin
-from . import utils, _yaml, _site, _signals
+from . import utils, _yaml, _site, _signals, _multiprocessing
 from . import Scope, Consistency
 
 
@@ -151,7 +151,7 @@ class Stream:
     def run_in_subprocess(self, func, *args, **kwargs):
         assert not self._subprocess
 
-        mp_context = mp.get_context(method="fork")
+        # mp_context = _multiprocessing.get_context(method='fork')
         process_name = "stream-{}".format(func.__name__)
 
         self._notify_front_queue = mp.Queue()
@@ -164,7 +164,7 @@ class Stream:
         args.insert(0, self._notify_front_queue)
         args.insert(0, func)
 
-        self._subprocess = mp_context.Process(
+        self._subprocess = _multiprocessing.AsyncioSafeProcess(
             target=Stream._subprocess_main, args=args, kwargs=kwargs, name=process_name
         )
 


[buildstream] 04/19: Introduce tblib to handle subprocess exceptions

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit cba58c750416f059ba65dbaa309db3dfd290e269
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Oct 31 12:16:18 2019 +0000

    Introduce tblib to handle subprocess exceptions
---
 requirements/requirements.in   |  1 +
 requirements/requirements.txt  |  1 +
 src/buildstream/_exceptions.py | 23 +++++++++++++++++++++--
 src/buildstream/_stream.py     | 20 +++++++++++++++-----
 4 files changed, 38 insertions(+), 7 deletions(-)

diff --git a/requirements/requirements.in b/requirements/requirements.in
index ce721da..5a602dc 100644
--- a/requirements/requirements.in
+++ b/requirements/requirements.in
@@ -8,3 +8,4 @@ ruamel.yaml >= 0.16
 setuptools
 pyroaring
 ujson
+tblib
diff --git a/requirements/requirements.txt b/requirements/requirements.txt
index 7d3d5d6..946535e 100644
--- a/requirements/requirements.txt
+++ b/requirements/requirements.txt
@@ -8,6 +8,7 @@ ruamel.yaml==0.16.5
 setuptools==40.8.0
 pyroaring==0.2.8
 ujson==1.35
+tblib==1.5.0
 ## The following requirements were added by pip freeze:
 MarkupSafe==1.1.1
 ruamel.yaml.clib==0.1.2
diff --git a/src/buildstream/_exceptions.py b/src/buildstream/_exceptions.py
index ca17577..85fcf61 100644
--- a/src/buildstream/_exceptions.py
+++ b/src/buildstream/_exceptions.py
@@ -20,6 +20,7 @@
 
 from enum import Enum, unique
 import os
+import sys
 
 # Disable pylint warnings for whole file here:
 # pylint: disable=global-statement
@@ -239,10 +240,12 @@ class LoadErrorReason(Enum):
 #    reason (LoadErrorReason): machine readable error reason
 #
 # This exception is raised when loading or parsing YAML, or when
-# interpreting project YAML
+# interpreting project YAML. Although reason has a default value,
+# the arg must be assigned to a LoadErrorReason. This is a workaround
+# for unpickling subclassed Exception() classes.
 #
 class LoadError(BstError):
-    def __init__(self, message, reason, *, detail=None):
+    def __init__(self, message, reason=None, *, detail=None):
         super().__init__(message, detail=detail, domain=ErrorDomain.LOAD, reason=reason)
 
 
@@ -385,3 +388,19 @@ class ArtifactElementError(BstError):
 class ProfileError(BstError):
     def __init__(self, message, detail=None, reason=None):
         super().__init__(message, detail=detail, domain=ErrorDomain.PROFILE, reason=reason)
+
+
+# SubprocessException
+#
+# Used with 'tblib.pickling_suport' to pickle the exception & traceback
+# object thrown from subprocessing a Stream entry point, e.g. build().
+# The install() method of pickling_support must be called before attempting
+# to pickle this object.
+#
+class SubprocessException:
+    def __init__(self, exception):
+        self.exception = exception
+        __, __, self.tb = sys.exc_info()
+
+    def re_raise(self):
+        raise self.exception.with_traceback(self.tb)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 0a53087..1b4ba0e 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -32,9 +32,18 @@ import tempfile
 import queue
 from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
+from tblib import pickling_support
 
 from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
+from ._exceptions import (
+    StreamError,
+    ImplError,
+    BstError,
+    ArtifactElementError,
+    ArtifactError,
+    set_last_task_error,
+    SubprocessException,
+)
 from ._message import Message, MessageType
 from ._scheduler import (
     Scheduler,
@@ -57,7 +66,6 @@ from .plugin import Plugin
 from . import utils, _yaml, _site
 from . import Scope, Consistency
 
-
 # Stream()
 #
 # This is the main, toplevel calling interface in BuildStream core.
@@ -123,10 +131,12 @@ class Stream:
         # Set main process
         utils._set_stream_pid()
 
+        # Add traceback pickling support
+        pickling_support.install()
         try:
             func(*args, **kwargs)
-        except Exception as e:
-            notify.put(Notification(NotificationType.EXCEPTION, exception=e))
+        except Exception as e:  # pylint: disable=broad-except
+            notify.put(Notification(NotificationType.EXCEPTION, exception=SubprocessException(e)))
 
     def run_in_subprocess(self, func, *args, **kwargs):
         assert not self._subprocess
@@ -1753,7 +1763,7 @@ class Stream:
         elif notification.notification_type == NotificationType.TASK_ERROR:
             set_last_task_error(*notification.task_error)
         elif notification.notification_type == NotificationType.EXCEPTION:
-            raise notification.exception
+            raise notification.exception.re_raise()
         else:
             raise StreamError("Unrecognised notification type received")
 


[buildstream] 17/19: Add profile topic for subprocessed stream method

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit f6ac2ac1bd9cf962b0b8b58ff7f739537ce2201f
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Nov 7 17:10:33 2019 +0000

    Add profile topic for subprocessed stream method
---
 src/buildstream/_profile.py | 1 +
 src/buildstream/_stream.py  | 3 ++-
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/src/buildstream/_profile.py b/src/buildstream/_profile.py
index fdde04a..68750b4 100644
--- a/src/buildstream/_profile.py
+++ b/src/buildstream/_profile.py
@@ -48,6 +48,7 @@ class Topics:
     LOAD_PIPELINE = "load-pipeline"
     LOAD_SELECTION = "load-selection"
     SCHEDULER = "scheduler"
+    SUBPROCESS = "subprocess"
     ALL = "all"
 
 
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index d640769..653f150 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -141,7 +141,8 @@ class Stream:
         pickling_support.install()
         with _signals.blocked([signal.SIGINT, signal.SIGTERM, signal.SIGTSTP], ignore=True):
             try:
-                func(*args, **kwargs)
+                with PROFILER.profile(Topics.SUBPROCESS, "stream"):
+                    func(*args, **kwargs)
             except Exception as e:  # pylint: disable=broad-except
                 notify.put(Notification(NotificationType.EXCEPTION, exception=SubprocessException(e)))
 


[buildstream] 03/19: Add in dual queue implementation for subprocess build.

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 224df9a07392be4898ace3663fdf5f696e5ab01e
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Sep 16 12:20:06 2019 +0100

    Add in dual queue implementation for subprocess build.
    
    This also adapts utils.py handling of PID to account for
    the stream multiprocessing, and how callers assert that
    they're the 'main_process' or in a job.
---
 doc/source/hacking/coding_guidelines.rst  |   2 +-
 src/buildstream/_messenger.py             |   2 +-
 src/buildstream/_scheduler/scheduler.py   |  42 +++++++++---
 src/buildstream/_stream.py                | 104 ++++++++++++++++++++++++++----
 src/buildstream/_workspaces.py            |   2 +-
 src/buildstream/element.py                |   6 +-
 src/buildstream/sandbox/_sandboxremote.py |   2 +-
 src/buildstream/utils.py                  |  31 ++++++---
 8 files changed, 152 insertions(+), 39 deletions(-)

diff --git a/doc/source/hacking/coding_guidelines.rst b/doc/source/hacking/coding_guidelines.rst
index 4ba6360..769609e 100644
--- a/doc/source/hacking/coding_guidelines.rst
+++ b/doc/source/hacking/coding_guidelines.rst
@@ -624,7 +624,7 @@ In these cases, do **not** raise any of the ``BstError`` class exceptions.
 
 Instead, use the ``assert`` statement, e.g.::
 
-  assert utils._is_main_process(), \
+  assert not utils._is_job_process(), \
       "Attempted to save workspace configuration from child process"
 
 This will result in a ``BUG`` message with the stack trace included being
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 03b2833..9e2269f 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -270,7 +270,7 @@ class Messenger:
         # we also do not allow it in the main process.
         assert self._log_handle is None
         assert self._log_filename is None
-        assert not utils._is_main_process()
+        assert utils._is_job_process()
 
         # Create the fully qualified logfile in the log directory,
         # appending the pid and .log extension at the end.
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index fb666df..8a2dbf6 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -24,6 +24,7 @@ import asyncio
 from itertools import chain
 import signal
 import datetime
+import queue
 
 # Local imports
 from .resources import Resources
@@ -67,6 +68,7 @@ class NotificationType(FastEnum):
     RETRY = "retry"
     MESSAGE = "message"
     TASK_ERROR = "task_error"
+    EXCEPTION = "exception"
 
 
 # Notification()
@@ -88,7 +90,9 @@ class Notification:
         time=None,
         element=None,
         message=None,
-        task_error=None
+        task_error=None,
+        for_scheduler=None,
+        exception=None
     ):
         self.notification_type = notification_type
         self.full_name = full_name
@@ -98,6 +102,7 @@ class Notification:
         self.element = element
         self.message = message
         self.task_error = task_error  # Tuple of domain & reason
+        self.exception = exception
 
 
 # Scheduler()
@@ -121,7 +126,7 @@ class Notification:
 #    ticker_callback: A callback call once per second
 #
 class Scheduler:
-    def __init__(self, context, start_time, state, notification_queue, notifier):
+    def __init__(self, context, start_time, state, notifier):
 
         #
         # Public members
@@ -145,8 +150,10 @@ class Scheduler:
         self._state = state
         self._casd_process = None  # handle to the casd process for monitoring purpose
 
-        # Bidirectional queue to send notifications back to the Scheduler's owner
-        self._notification_queue = notification_queue
+        # Bidirectional pipe to send notifications back to the Scheduler's owner
+        self._notify_front = None
+        self._notify_back = None
+        # Notifier callback to use if not running in a subprocess
         self._notifier = notifier
 
         self.resources = Resources(context.sched_builders, context.sched_fetchers, context.sched_pushers)
@@ -190,6 +197,10 @@ class Scheduler:
         _watcher = asyncio.get_child_watcher()
         _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
 
+        # Add notification handler
+        if self._notify_back:
+            self.loop.call_later(0.01, self._loop)
+
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
             # Run the queues
@@ -573,12 +584,13 @@ class Scheduler:
         queue.enqueue([element])
 
     def _notify(self, notification):
-        # Scheduler to Stream notifcations on right side
-        self._notification_queue.append(notification)
-        self._notifier()
+        # Check if we need to call the notifier callback
+        if self._notify_front:
+            self._notify_front.put(notification)
+        else:
+            self._notifier(notification)
 
-    def _stream_notification_handler(self):
-        notification = self._notification_queue.popleft()
+    def _stream_notification_handler(self, notification):
         if notification.notification_type == NotificationType.TERMINATE:
             self.terminate_jobs()
         elif notification.notification_type == NotificationType.QUIT:
@@ -594,6 +606,18 @@ class Scheduler:
             # as we don't want to pickle exceptions between processes
             raise ValueError("Unrecognised notification type received")
 
+    def _loop(self):
+        assert self._notify_back
+        # Check for and process new messages
+        while True:
+            try:
+                notification = self._notify_back.get_nowait()
+                self._stream_notification_handler(notification)
+            except queue.Empty:
+                notification = None
+                break
+        self.loop.call_later(0.01, self._loop)
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index d907e93..0a53087 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -19,6 +19,9 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 #        Tristan Maat <tr...@codethink.co.uk>
 
+import asyncio
+import functools
+import multiprocessing as mp
 import os
 import sys
 import stat
@@ -26,9 +29,9 @@ import shlex
 import shutil
 import tarfile
 import tempfile
+import queue
 from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
-from collections import deque
 
 from ._artifactelement import verify_artifact_ref, ArtifactElement
 from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
@@ -88,14 +91,13 @@ class Stream:
         self._project = None
         self._pipeline = None
         self._state = State(session_start)  # Owned by Stream, used by Core to set state
-        self._notification_queue = deque()
+        # self._notification_pipe_front, self._notification_pipe_back = mp.Pipe()
+        self._subprocess = None
         self._starttime = session_start  # Synchronised with Scheduler's relative start time
 
         context.messenger.set_state(self._state)
 
-        self._scheduler = Scheduler(
-            context, session_start, self._state, self._notification_queue, self._scheduler_notification_handler
-        )
+        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler)
         self._first_non_track_queue = None
         self._session_start_callback = session_start_callback
         self._ticker_callback = ticker_callback
@@ -104,6 +106,8 @@ class Stream:
         self._scheduler_running = False
         self._scheduler_terminated = False
         self._scheduler_suspended = False
+        self._notify_front = None
+        self._notify_back = None
 
     # init()
     #
@@ -114,11 +118,65 @@ class Stream:
         self._artifacts = self._context.artifactcache
         self._sourcecache = self._context.sourcecache
 
+    @staticmethod
+    def _subprocess_main(func, notify, *args, **kwargs):
+        # Set main process
+        utils._set_stream_pid()
+
+        try:
+            func(*args, **kwargs)
+        except Exception as e:
+            notify.put(Notification(NotificationType.EXCEPTION, exception=e))
+
+    def run_in_subprocess(self, func, *args, **kwargs):
+        assert not self._subprocess
+
+        mp_context = mp.get_context(method="fork")
+        process_name = "stream-{}".format(func.__name__)
+
+        self._notify_front = mp.Queue()
+        self._notify_back = mp.Queue()
+        # Tell the scheduler to not use the notifier callback
+        self._scheduler._notify_front = self._notify_front
+        self._scheduler._notify_back = self._notify_back
+
+        args = list(args)
+        args.insert(0, self._notify_front)
+        args.insert(0, func)
+
+        self._subprocess = mp_context.Process(
+            target=Stream._subprocess_main, args=args, kwargs=kwargs, name=process_name
+        )
+
+        self._subprocess.start()
+
+        # TODO connect signal handlers with asyncio
+        while self._subprocess.exitcode is None:
+            # check every given time interval on subprocess state
+            self._subprocess.join(0.01)
+            # if no exit code, go back to checking the message queue
+            self._loop()
+        print("Stopping loop...")
+
+        # Ensure no more notifcations to process
+        try:
+            while True:
+                notification = self._notify_front.get_nowait()
+                self._scheduler_notification_handler(notification)
+        except queue.Empty:
+            print("Finished processing notifications")
+            pass
+
     # cleanup()
     #
     # Cleans up application state
     #
     def cleanup(self):
+        # Close the notification queue
+        for q in [self._notify_back, self._notify_front]:
+            if q is not None:
+                q.close()
+        # self._notification_queue.cancel_join_thread()
         if self._project:
             self._project.cleanup()
 
@@ -261,6 +319,9 @@ class Stream:
             scope, directory, mounts=mounts, isolate=isolate, prompt=prompt, command=command, usebuildtree=buildtree
         )
 
+    def build(self, *args, **kwargs):
+        self.run_in_subprocess(self._build, *args, **kwargs)
+
     # build()
     #
     # Builds (assembles) elements in the pipeline.
@@ -274,7 +335,7 @@ class Stream:
     # If `remote` specified as None, then regular configuration will be used
     # to determine where to push artifacts to.
     #
-    def build(self, targets, *, selection=PipelineSelection.PLAN, ignore_junction_targets=False, remote=None):
+    def _build(self, targets, *, selection=PipelineSelection.PLAN, ignore_junction_targets=False, remote=None):
 
         use_config = True
         if remote:
@@ -1667,11 +1728,7 @@ class Stream:
 
         return element_targets, artifact_refs
 
-    def _scheduler_notification_handler(self):
-        # Check the queue is there
-        assert self._notification_queue
-        notification = self._notification_queue.pop()
-
+    def _scheduler_notification_handler(self, notification):
         if notification.notification_type == NotificationType.MESSAGE:
             self._context.messenger.message(notification.message)
         elif notification.notification_type == NotificationType.INTERRUPT:
@@ -1681,6 +1738,7 @@ class Stream:
         elif notification.notification_type == NotificationType.JOB_START:
             self._state.add_task(notification.job_action, notification.full_name, notification.time)
         elif notification.notification_type == NotificationType.JOB_COMPLETE:
+            # State between scheduler & stream is different if run in a subprocces
             self._state.remove_task(notification.job_action, notification.full_name)
             if notification.job_status == JobStatus.FAIL:
                 self._state.fail_task(notification.job_action, notification.full_name, notification.element)
@@ -1694,13 +1752,31 @@ class Stream:
             self._scheduler_suspended = not self._scheduler_suspended
         elif notification.notification_type == NotificationType.TASK_ERROR:
             set_last_task_error(*notification.task_error)
+        elif notification.notification_type == NotificationType.EXCEPTION:
+            raise notification.exception
         else:
             raise StreamError("Unrecognised notification type received")
 
     def _notify(self, notification):
-        # Stream to scheduler notifcations on left side
-        self._notification_queue.appendleft(notification)
-        self._notifier()
+        # Set that the notifcation is for the scheduler
+        # notification.for_scheduler = True
+        if self._notify_back:
+            self._notify_back.put(notification)
+        else:
+            self._scheduler._stream_notification_handler(notification)
+
+    # The code to be run by the Stream's event loop while delegating
+    # work to a subprocess with the @subprocessed decorator
+    def _loop(self):
+        assert self._notify_front
+        # Check for and process new messages
+        while True:
+            try:
+                notification = self._notify_front.get_nowait()
+                self._scheduler_notification_handler(notification)
+            except queue.Empty:
+                notification = None
+                break
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
diff --git a/src/buildstream/_workspaces.py b/src/buildstream/_workspaces.py
index 3d50fd9..f9636f8 100644
--- a/src/buildstream/_workspaces.py
+++ b/src/buildstream/_workspaces.py
@@ -518,7 +518,7 @@ class Workspaces:
     # create_workspace permanent
     #
     def save_config(self):
-        assert utils._is_main_process()
+        assert not utils._is_job_process()
 
         config = {
             "format-version": BST_WORKSPACE_FORMAT_VERSION,
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 74fb1a0..97d9294 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -773,7 +773,7 @@ class Element(Plugin):
                 self.info("Resetting workspace state, last successful build is no longer in the cache")
 
                 # In case we are staging in the main process
-                if utils._is_main_process():
+                if not utils._is_job_process():
                     context.get_workspaces().save_config()
 
         for dep in self.dependencies(scope):
@@ -798,7 +798,7 @@ class Element(Plugin):
 
                     # In case we are running `bst shell`, this happens in the
                     # main process and we need to update the workspace config
-                    if utils._is_main_process():
+                    if not utils._is_job_process():
                         context.get_workspaces().save_config()
 
             result = dep.stage_artifact(
@@ -1655,7 +1655,7 @@ class Element(Plugin):
         self._update_ready_for_runtime_and_cached()
 
         if self._get_workspace() and self._cached_success():
-            assert utils._is_main_process(), "Attempted to save workspace configuration from child process"
+            assert not utils._is_job_process(), "Attempted to save workspace configuration from child process"
             #
             # Note that this block can only happen in the
             # main process, since `self._cached_success()` cannot
diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py
index d4ffd64..c07ab8c 100644
--- a/src/buildstream/sandbox/_sandboxremote.py
+++ b/src/buildstream/sandbox/_sandboxremote.py
@@ -59,7 +59,7 @@ class SandboxRemote(SandboxREAPI):
             return
 
         # gRPC doesn't support fork without exec, which is used in the main process.
-        assert not utils._is_main_process()
+        assert utils._is_job_process()
 
         self.storage_url = config.storage_service["url"]
         self.exec_url = config.exec_service["url"]
diff --git a/src/buildstream/utils.py b/src/buildstream/utils.py
index 181ea1d..f3caa84 100644
--- a/src/buildstream/utils.py
+++ b/src/buildstream/utils.py
@@ -58,6 +58,9 @@ _URI_SCHEMES = ["http", "https", "ftp", "file", "git", "sftp", "ssh"]
 # Main process pid
 _MAIN_PID = os.getpid()
 
+# This is different to _MAIN_PID if running a subprocessed stream entry point
+_STREAM_PID = _MAIN_PID
+
 # The number of threads in the main process at startup.
 # This is 1 except for certain test environments (xdist/execnet).
 _INITIAL_NUM_THREADS_IN_MAIN_PROCESS = 1
@@ -752,13 +755,18 @@ def _pretty_size(size, dec_places=0):
     return "{size:g}{unit}".format(size=round(psize, dec_places), unit=unit)
 
 
-# _is_main_process()
+# _is_job_process()
 #
-# Return whether we are in the main process or not.
+# Return whether we are in a job process.
 #
-def _is_main_process():
-    assert _MAIN_PID is not None
-    return os.getpid() == _MAIN_PID
+def _is_job_process():
+    assert _STREAM_PID is not None
+    return os.getpid() != _STREAM_PID
+
+
+def _set_stream_pid() -> None:
+    global _STREAM_PID  # pylint: disable=global-statement
+    _STREAM_PID = os.getpid()
 
 
 # Recursively remove directories, ignoring file permissions as much as
@@ -1444,10 +1452,15 @@ def _is_single_threaded():
     # Use psutil as threading.active_count() doesn't include gRPC threads.
     process = psutil.Process()
 
-    if process.pid == _MAIN_PID:
-        expected_num_threads = _INITIAL_NUM_THREADS_IN_MAIN_PROCESS
-    else:
-        expected_num_threads = 1
+    expected_num_threads = 1
+
+    if process.pid == _STREAM_PID:
+        if _STREAM_PID != _MAIN_PID:
+            # multiprocessing.Queue() has a background thread for object pickling,
+            # see https://docs.python.org/3/library/multiprocessing.html#pipes-and-queues
+            expected_num_threads += 1
+        else:
+            expected_num_threads = _INITIAL_NUM_THREADS_IN_MAIN_PROCESS
 
     # gRPC threads are not joined when shut down. Wait for them to exit.
     wait = 0.1


[buildstream] 09/19: Move sched notification poll to loop reader

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit a4bafc125655b73a81cc2e0421f02e508b58660e
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Oct 2 11:30:10 2019 +0100

    Move sched notification poll to loop reader
---
 src/buildstream/_scheduler/scheduler.py | 30 ++++++++++++++++--------------
 1 file changed, 16 insertions(+), 14 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 3476162..87853f0 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -203,15 +203,16 @@ class Scheduler:
         _watcher = asyncio.get_child_watcher()
         _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
 
-        # Add notification handler
-        if self._notify_back_queue:
-            self.loop.call_later(0.01, self._loop)
+        # Add notification listener if in subprocess
+        self._start_listening()
 
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
             # Run the queues
             self._sched()
             self.loop.run_forever()
+            # Stop listening for notifications
+            self._stop_listening()
             self.loop.close()
 
         # Stop watching casd
@@ -387,7 +388,7 @@ class Scheduler:
     #
     def _abort_on_casd_failure(self, pid, returncode):
         message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
-        self._notify(Notification(NotificationType.MESSAGE, message=message))
+        self._notify_front(Notification(NotificationType.MESSAGE, message=message))
 
         self._casd_process.returncode = returncode
         self.terminate_jobs()
@@ -621,16 +622,17 @@ class Scheduler:
             raise ValueError("Unrecognised notification type received")
 
     def _loop(self):
-        assert self._notify_back_queue
-        # Check for and process new messages
-        while True:
-            try:
-                notification = self._notify_back_queue.get_nowait()
-                self._notification_handler(notification)
-            except queue.Empty:
-                notification = None
-                break
-        self.loop.call_later(0.01, self._loop)
+        while not self._notify_back_queue.empty():
+            notification = self._notify_back_queue.get_nowait()
+            self._notification_handler(notification)
+
+    def _start_listening(self):
+        if self._notify_back_queue:
+            self.loop.add_reader(self._notify_back_queue._reader.fileno(), self._loop)
+
+    def _stop_listening(self):
+        if self._notify_back_queue:
+            self.loop.remove_reader(self._notify_back_queue._reader.fileno())
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing


[buildstream] 14/19: Fixup sched notification to frontend

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 3cbddad8f5b8f4de49d1ee3050474d015a111a66
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Oct 29 13:06:55 2019 +0000

    Fixup sched notification to frontend
---
 src/buildstream/_scheduler/scheduler.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 7a85f69..6e768a1 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -457,7 +457,7 @@ class Scheduler:
         # Make sure fork is allowed before starting jobs
         if not self.context.prepare_fork():
             message = Message(MessageType.BUG, "Fork is not allowed", detail="Background threads are active")
-            self._notify(Notification(NotificationType.MESSAGE, message=message))
+            self._notify_front(Notification(NotificationType.MESSAGE, message=message))
             self.terminate_jobs()
             return
 


[buildstream] 10/19: Failed shell to load via name if no plugintable state

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 04fafcd428a8d8bc702e58be533018ed889b6b18
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Oct 2 14:09:44 2019 +0100

    Failed shell to load via name if no plugintable state
---
 src/buildstream/_frontend/app.py |  8 +++++++-
 src/buildstream/_stream.py       | 11 +++++++++--
 2 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 0961085..471901f 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -684,7 +684,13 @@ class App:
                         unique_id, element_key = element
                         prompt = self.shell_prompt(full_name, element_key)
                         self.stream.shell(
-                            None, Scope.BUILD, prompt, isolate=True, usebuildtree="always", unique_id=unique_id
+                            None,
+                            Scope.BUILD,
+                            prompt,
+                            isolate=True,
+                            usebuildtree="always",
+                            unique_id=unique_id,
+                            full_name=full_name,
                         )
                     except BstError as e:
                         click.echo("Error while attempting to create interactive shell: {}".format(e), err=True)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index c8758da..06643d9 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -252,6 +252,7 @@ class Stream:
     #    usebuildtree (str): Whether to use a buildtree as the source, given cli option
     #    pull_dependencies ([Element]|None): Elements to attempt to pull
     #    unique_id: (str): Whether to use a unique_id to load an Element instance
+    #    full_name: (str): The elements full name, used if unique_id lookup fails
     #
     # Returns:
     #    (int): The exit code of the launched shell
@@ -268,12 +269,18 @@ class Stream:
         command=None,
         usebuildtree=None,
         pull_dependencies=None,
-        unique_id=None
+        unique_id=None,
+        full_name=None
     ):
 
         # Load the Element via the unique_id if given
         if unique_id and element is None:
-            element = Plugin._lookup(unique_id)
+            try:
+                element = Plugin._lookup(unique_id)
+            except AssertionError:
+                # Could not be loaded from plugintable, load forcefully
+                element_list = self.load_selection([full_name], selection=PipelineSelection.NONE)
+                element = element_list[0]
 
         # Assert we have everything we need built, unless the directory is specified
         # in which case we just blindly trust the directory, using the element


[buildstream] 01/19: .gitlab-ci.yml: Track new version of freedesktop-sdk to fix overnigth test

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit c3fc4e14dfe6400a92b83785b8a7dce6e4a12235
Author: Javier Jardón <jj...@gnome.org>
AuthorDate: Wed Nov 20 07:13:07 2019 +0900

    .gitlab-ci.yml: Track new version of freedesktop-sdk to fix overnigth test
    
    This version tracks correct mesa-aco repo
    
    Fixes #1207
---
 .gitlab-ci.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 00270e8..f393be5 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -315,7 +315,7 @@ docs:
   variables:
     BST_EXT_URL: git+https://gitlab.com/BuildStream/bst-plugins-experimental.git
     BST_EXT_REF: 0.12.0-173-gbe5ac19#egg=bst_plugins_experimental[ostree,cargo]
-    FD_SDK_REF: freedesktop-sdk-19.08.3-7-g4529b070
+    FD_SDK_REF: freedesktop-sdk-19.08.3-buildstream2-0-gb66129f8e86acb1b6f35b825607b8e60362773a2
   before_script:
   - |
     mkdir -p "${HOME}/.config"


[buildstream] 15/19: Lint fixes

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 68521e9fe54075155bfa51f603f873a3d0fcc14b
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Oct 29 12:52:27 2019 +0000

    Lint fixes
---
 setup.cfg                               | 2 +-
 src/buildstream/_scheduler/scheduler.py | 1 -
 src/buildstream/_stream.py              | 3 +--
 3 files changed, 2 insertions(+), 4 deletions(-)

diff --git a/setup.cfg b/setup.cfg
index 3637586..78b4fb7 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -30,7 +30,7 @@ warn_no_return = True
 
 # Ignore missing stubs for third-party packages.
 # In future, these should be re-enabled if/when stubs for them become available.
-[mypy-copyreg,arpy,grpc,pluginbase,psutil,py,pyroaring,pytest,_pytest.*,ruamel]
+[mypy-copyreg,arpy,grpc,pluginbase,psutil,py,pyroaring,pytest,_pytest.*,ruamel,tblib]
 ignore_missing_imports=True
 
 # Ignore missing stubs for Cythonized modules.
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 6e768a1..f0292f9 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -24,7 +24,6 @@ import asyncio
 from itertools import chain
 import signal
 import datetime
-import queue
 
 # Local imports
 from .resources import Resources
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 550fa0f..4dbdebb 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -20,7 +20,6 @@
 #        Tristan Maat <tr...@codethink.co.uk>
 
 import asyncio
-import functools
 import multiprocessing as mp
 import os
 import sys
@@ -29,7 +28,6 @@ import shlex
 import shutil
 import tarfile
 import tempfile
-import queue
 import signal
 from contextlib import contextmanager, suppress
 from fnmatch import fnmatch
@@ -68,6 +66,7 @@ from .plugin import Plugin
 from . import utils, _yaml, _site, _signals
 from . import Scope, Consistency
 
+
 # Stream()
 #
 # This is the main, toplevel calling interface in BuildStream core.


[buildstream] 08/19: Make it more verbose with front & back notifications

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit bf1741582db3316e00d980e8b012b5b35d64d635
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Sep 27 17:46:46 2019 +0100

    Make it more verbose with front & back notifications
---
 src/buildstream/_scheduler/scheduler.py | 48 +++++++++++------------
 src/buildstream/_stream.py              | 67 ++++++++++++++++-----------------
 2 files changed, 56 insertions(+), 59 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 6b5f306..3476162 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -157,8 +157,8 @@ class Scheduler:
         self._casd_process = None  # handle to the casd process for monitoring purpose
 
         # Bidirectional pipe to send notifications back to the Scheduler's owner
-        self._notify_front = None
-        self._notify_back = None
+        self._notify_front_queue = None
+        self._notify_back_queue = None
         # Notifier callback to use if not running in a subprocess
         self._notifier = notifier
 
@@ -190,7 +190,7 @@ class Scheduler:
         asyncio.set_event_loop(self.loop)
 
         # Notify that the loop has been created
-        self._notify(Notification(NotificationType.RUNNING))
+        self._notify_front(Notification(NotificationType.RUNNING))
 
         # Add timeouts
         self.loop.call_later(1, self._tick)
@@ -204,7 +204,7 @@ class Scheduler:
         _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
 
         # Add notification handler
-        if self._notify_back:
+        if self._notify_back_queue:
             self.loop.call_later(0.01, self._loop)
 
         # Start the profiler
@@ -225,7 +225,7 @@ class Scheduler:
         self.loop = None
 
         # Notify that the loop has been reset
-        self._notify(Notification(NotificationType.RUNNING))
+        self._notify_front(Notification(NotificationType.RUNNING))
 
         if failed:
             status = SchedStatus.ERROR
@@ -235,12 +235,12 @@ class Scheduler:
             status = SchedStatus.SUCCESS
 
         # Send the state taskgroups if we're running under the subprocess
-        if self._notify_front:
+        if self._notify_front_queue:
             # Don't pickle state
             for group in self._state.task_groups.values():
                 group._state = None
             notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
-            self._notify_front.put(notification)
+            self._notify_front_queue.put(notification)
 
         return status
 
@@ -279,7 +279,7 @@ class Scheduler:
 
         # Notify the frontend that we're terminated as it might be
         # from an interactive prompt callback or SIGTERM
-        self._notify(Notification(NotificationType.TERMINATED))
+        self._notify_front(Notification(NotificationType.TERMINATED))
         self.loop.call_soon(self._terminate_jobs_real)
 
         # Block this until we're finished terminating jobs,
@@ -342,7 +342,7 @@ class Scheduler:
             job_status=status,
             element=element_info,
         )
-        self._notify(notification)
+        self._notify_front(notification)
         self._sched()
 
     # notify_messenger()
@@ -354,7 +354,7 @@ class Scheduler:
     #                       handler, as assigned by context's messenger.
     #
     def notify_messenger(self, message):
-        self._notify(Notification(NotificationType.MESSAGE, message=message))
+        self._notify_front(Notification(NotificationType.MESSAGE, message=message))
 
     # set_last_task_error()
     #
@@ -368,7 +368,7 @@ class Scheduler:
     def set_last_task_error(self, domain, reason):
         task_error = domain, reason
         notification = Notification(NotificationType.TASK_ERROR, task_error=task_error)
-        self._notify(notification)
+        self._notify_front(notification)
 
     #######################################################
     #                  Local Private Methods              #
@@ -407,7 +407,7 @@ class Scheduler:
             job_action=job.action_name,
             time=self._state.elapsed_time(start_time=self._starttime),
         )
-        self._notify(notification)
+        self._notify_front(notification)
         job.start()
 
     # _sched_queue_jobs()
@@ -497,7 +497,7 @@ class Scheduler:
             self._suspendtime = datetime.datetime.now()
             self.suspended = True
             # Notify that we're suspended
-            self._notify(Notification(NotificationType.SUSPENDED))
+            self._notify_front(Notification(NotificationType.SUSPENDED))
             for job in self._active_jobs:
                 job.suspend()
 
@@ -511,9 +511,9 @@ class Scheduler:
                 job.resume()
             self.suspended = False
             # Notify that we're unsuspended
-            self._notify(Notification(NotificationType.SUSPENDED))
+            self._notify_front(Notification(NotificationType.SUSPENDED))
             self._starttime += datetime.datetime.now() - self._suspendtime
-            self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
+            self._notify_front(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
             self._suspendtime = None
 
     # _interrupt_event():
@@ -529,7 +529,7 @@ class Scheduler:
             return
 
         notification = Notification(NotificationType.INTERRUPT)
-        self._notify(notification)
+        self._notify_front(notification)
 
     # _terminate_event():
     #
@@ -582,7 +582,7 @@ class Scheduler:
 
     # Regular timeout for driving status in the UI
     def _tick(self):
-        self._notify(Notification(NotificationType.TICK))
+        self._notify_front(Notification(NotificationType.TICK))
         self.loop.call_later(1, self._tick)
 
     def _failure_retry(self, action_name, unique_id):
@@ -597,14 +597,14 @@ class Scheduler:
         queue._task_group.failed_tasks.remove(element._get_full_name())
         queue.enqueue([element])
 
-    def _notify(self, notification):
+    def _notify_front(self, notification):
         # Check if we need to call the notifier callback
-        if self._notify_front:
-            self._notify_front.put(notification)
+        if self._notify_front_queue:
+            self._notify_front_queue.put(notification)
         else:
             self._notifier(notification)
 
-    def _stream_notification_handler(self, notification):
+    def _notification_handler(self, notification):
         if notification.notification_type == NotificationType.TERMINATE:
             self.terminate_jobs()
         elif notification.notification_type == NotificationType.QUIT:
@@ -621,12 +621,12 @@ class Scheduler:
             raise ValueError("Unrecognised notification type received")
 
     def _loop(self):
-        assert self._notify_back
+        assert self._notify_back_queue
         # Check for and process new messages
         while True:
             try:
-                notification = self._notify_back.get_nowait()
-                self._stream_notification_handler(notification)
+                notification = self._notify_back_queue.get_nowait()
+                self._notification_handler(notification)
             except queue.Empty:
                 notification = None
                 break
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 841ccde..c8758da 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -106,17 +106,16 @@ class Stream:
 
         context.messenger.set_state(self._state)
 
-        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler)
+        self._scheduler = Scheduler(context, session_start, self._state, self._notification_handler)
         self._first_non_track_queue = None
         self._session_start_callback = session_start_callback
         self._ticker_callback = ticker_callback
         self._interrupt_callback = interrupt_callback
-        self._notifier = self._scheduler._stream_notification_handler  # Assign the schedulers notification handler
         self._scheduler_running = False
         self._scheduler_terminated = False
         self._scheduler_suspended = False
-        self._notify_front = None
-        self._notify_back = None
+        self._notify_front_queue = None
+        self._notify_back_queue = None
 
     # init()
     #
@@ -145,14 +144,14 @@ class Stream:
         mp_context = mp.get_context(method="fork")
         process_name = "stream-{}".format(func.__name__)
 
-        self._notify_front = mp.Queue()
-        self._notify_back = mp.Queue()
+        self._notify_front_queue = mp.Queue()
+        self._notify_back_queue = mp.Queue()
         # Tell the scheduler to not use the notifier callback
-        self._scheduler._notify_front = self._notify_front
-        self._scheduler._notify_back = self._notify_back
+        self._scheduler._notify_front_queue = self._notify_front_queue
+        self._scheduler._notify_back_queue = self._notify_back_queue
 
         args = list(args)
-        args.insert(0, self._notify_front)
+        args.insert(0, self._notify_front_queue)
         args.insert(0, func)
 
         self._subprocess = mp_context.Process(
@@ -172,8 +171,8 @@ class Stream:
         # Ensure no more notifcations to process
         try:
             while True:
-                notification = self._notify_front.get_nowait()
-                self._scheduler_notification_handler(notification)
+                notification = self._notify_front_queue.get_nowait()
+                self._notification_handler(notification)
         except queue.Empty:
             print("Finished processing notifications")
             pass
@@ -184,7 +183,7 @@ class Stream:
     #
     def cleanup(self):
         # Close the notification queue
-        for q in [self._notify_back, self._notify_front]:
+        for q in [self._notify_back_queue, self._notify_front_queue]:
             if q is not None:
                 q.close()
         # self._notification_queue.cancel_join_thread()
@@ -1222,7 +1221,7 @@ class Stream:
     #
     def terminate(self):
         notification = Notification(NotificationType.TERMINATE)
-        self._notify(notification)
+        self._notify_back(notification)
 
     # quit()
     #
@@ -1232,7 +1231,7 @@ class Stream:
     #
     def quit(self):
         notification = Notification(NotificationType.QUIT)
-        self._notify(notification)
+        self._notify_back(notification)
 
     # suspend()
     #
@@ -1242,11 +1241,11 @@ class Stream:
     def suspend(self):
         # Send the notification to suspend jobs
         notification = Notification(NotificationType.SUSPEND)
-        self._notify(notification)
+        self._notify_back(notification)
         yield
         # Unsuspend jobs on context exit
         notification = Notification(NotificationType.UNSUSPEND)
-        self._notify(notification)
+        self._notify_back(notification)
 
     #############################################################
     #                    Private Methods                        #
@@ -1435,7 +1434,7 @@ class Stream:
     #
     def _failure_retry(self, action_name, unique_id):
         notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id)
-        self._notify(notification)
+        self._notify_back(notification)
 
     # _run()
     #
@@ -1449,17 +1448,11 @@ class Stream:
         self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
 
         if self._session_start_callback is not None:
-            if self._notify_front:
-                self._notify_front.put(Notification(NotificationType.START))
-            else:
-                self._session_start_callback()
+            self._notify_front(Notification(NotificationType.START))
 
         # Also send through the session & total elements list lengths for status rendering
         element_totals = str(len(self.session_elements)), str(len(self.total_elements))
-        if self._notify_front:
-            self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals))
-        else:
-            self.len_session_elements, self.len_total_elements = element_totals
+        self._notify_front(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals))
 
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
 
@@ -1749,7 +1742,7 @@ class Stream:
 
         return element_targets, artifact_refs
 
-    def _scheduler_notification_handler(self, notification):
+    def _notification_handler(self, notification):
         if notification.notification_type == NotificationType.TASK_GROUPS:
             self._state.task_groups = notification.task_groups
         elif notification.notification_type == NotificationType.MESSAGE:
@@ -1784,23 +1777,27 @@ class Stream:
         else:
             raise StreamError("Unrecognised notification type received")
 
-    def _notify(self, notification):
-        # Set that the notifcation is for the scheduler
-        # notification.for_scheduler = True
-        if self._notify_back:
-            self._notify_back.put(notification)
+    def _notify_back(self, notification):
+        if self._notify_back_queue:
+            self._notify_back_queue.put(notification)
+        else:
+            self._scheduler._notification_handler(notification)
+
+    def _notify_front(self, notification):
+        if self._notify_front_queue:
+            self._notify_front_queue.put(notification)
         else:
-            self._scheduler._stream_notification_handler(notification)
+            self._notification_handler(notification)
 
     # The code to be run by the Stream's event loop while delegating
     # work to a subprocess with the @subprocessed decorator
     def _loop(self):
-        assert self._notify_front
+        assert self._notify_front_queue
         # Check for and process new messages
         while True:
             try:
-                notification = self._notify_front.get_nowait()
-                self._scheduler_notification_handler(notification)
+                notification = self._notify_front_queue.get_nowait()
+                self._notification_handler(notification)
             except queue.Empty:
                 notification = None
                 break


[buildstream] 02/19: scheduler.py: Notification for last_task_error propagation

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit f45ac38490af9b79420fca82bd2e3160be56e0cc
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Tue Sep 10 15:10:04 2019 +0100

    scheduler.py: Notification for last_task_error propagation
    
    Add a notification for TASK_ERROR. As queues & job handlers will
    be running in a different process to the front end, the global
    state in the frontend Exception process needs to be notified.
    This is used internally for the BST_TEST_SUITE.
---
 src/buildstream/_scheduler/jobs/job.py     |  4 ++--
 src/buildstream/_scheduler/queues/queue.py |  4 ++--
 src/buildstream/_scheduler/scheduler.py    | 19 ++++++++++++++++++-
 src/buildstream/_stream.py                 |  4 +++-
 4 files changed, 25 insertions(+), 6 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index e7866bc..4cb80b8 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -29,7 +29,7 @@ import sys
 import traceback
 
 # BuildStream toplevel imports
-from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
+from ..._exceptions import ImplError, BstError, SkipJob
 from ..._message import Message, MessageType, unconditional_messages
 from ...types import FastEnum
 from ... import _signals, utils
@@ -491,7 +491,7 @@ class Job:
             # For regression tests only, save the last error domain / reason
             # reported from a child task in the main process, this global state
             # is currently managed in _exceptions.py
-            set_last_task_error(envelope.message["domain"], envelope.message["reason"])
+            self._scheduler.set_last_task_error(envelope.message["domain"], envelope.message["reason"])
         elif envelope.message_type is _MessageType.RESULT:
             assert self._result is None
             self._result = envelope.message
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 986ac6c..79cb162 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -30,7 +30,7 @@ from ..jobs import ElementJob, JobStatus
 from ..resources import ResourceType
 
 # BuildStream toplevel imports
-from ..._exceptions import BstError, ImplError, set_last_task_error
+from ..._exceptions import BstError, ImplError
 from ..._message import Message, MessageType
 from ...types import FastEnum
 
@@ -326,7 +326,7 @@ class Queue:
             #
             # This just allows us stronger testing capability
             #
-            set_last_task_error(e.domain, e.reason)
+            self._scheduler.set_last_task_error(e.domain, e.reason)
 
         except Exception:  # pylint: disable=broad-except
 
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 171281b..fb666df 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -66,6 +66,7 @@ class NotificationType(FastEnum):
     SUSPENDED = "suspended"
     RETRY = "retry"
     MESSAGE = "message"
+    TASK_ERROR = "task_error"
 
 
 # Notification()
@@ -86,7 +87,8 @@ class Notification:
         job_status=None,
         time=None,
         element=None,
-        message=None
+        message=None,
+        task_error=None
     ):
         self.notification_type = notification_type
         self.full_name = full_name
@@ -95,6 +97,7 @@ class Notification:
         self.time = time
         self.element = element
         self.message = message
+        self.task_error = task_error  # Tuple of domain & reason
 
 
 # Scheduler()
@@ -328,6 +331,20 @@ class Scheduler:
     def notify_messenger(self, message):
         self._notify(Notification(NotificationType.MESSAGE, message=message))
 
+    # set_last_task_error()
+    #
+    # Save the last error domain / reason reported from a child job or queue
+    # in the main process.
+    #
+    # Args:
+    #    domain (ErrorDomain): Enum for the domain from which the error occurred
+    #    reason (str): String identifier representing the reason for the error
+    #
+    def set_last_task_error(self, domain, reason):
+        task_error = domain, reason
+        notification = Notification(NotificationType.TASK_ERROR, task_error=task_error)
+        self._notify(notification)
+
     #######################################################
     #                  Local Private Methods              #
     #######################################################
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e0a8d92..d907e93 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -31,7 +31,7 @@ from fnmatch import fnmatch
 from collections import deque
 
 from ._artifactelement import verify_artifact_ref, ArtifactElement
-from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError
+from ._exceptions import StreamError, ImplError, BstError, ArtifactElementError, ArtifactError, set_last_task_error
 from ._message import Message, MessageType
 from ._scheduler import (
     Scheduler,
@@ -1692,6 +1692,8 @@ class Stream:
             self._scheduler_terminated = True
         elif notification.notification_type == NotificationType.SUSPENDED:
             self._scheduler_suspended = not self._scheduler_suspended
+        elif notification.notification_type == NotificationType.TASK_ERROR:
+            set_last_task_error(*notification.task_error)
         else:
             raise StreamError("Unrecognised notification type received")
 


[buildstream] 13/19: Add support for logger print header displaying pipeline output

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 6a9db55874f0ed8354aed5677081f3584aa44084
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Thu Oct 24 17:04:29 2019 +0100

    Add support for logger print header displaying pipeline output
---
 src/buildstream/_frontend/app.py        |  3 +++
 src/buildstream/_frontend/widget.py     |  6 +++++-
 src/buildstream/_scheduler/scheduler.py |  5 ++++-
 src/buildstream/_stream.py              | 10 ++++++++++
 4 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 5fe38ce..704a489 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -233,6 +233,9 @@ class App:
                 indent=INDENT,
             )
 
+            # Register the Logline pipeline renderer callback in Stream
+            self.stream._pipeline_render_callback = self.logger.show_pipeline
+
             # Propagate pipeline feedback to the user
             self.context.messenger.set_message_handler(self._message_handler)
 
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 7c846bc..77de825 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -490,7 +490,11 @@ class LogLine(Widget):
 
         # Pipeline state
         text += self.content_profile.fmt("Pipeline\n", bold=True)
-        text += self.show_pipeline(stream.total_elements, context.log_element_format)
+        # Check if the output of show pipeline has already been generated for stream total elements
+        if stream.total_pipeline_render:
+            text += stream.total_pipeline_render
+        else:
+            text += self.show_pipeline(stream.total_elements, context.log_element_format)
         text += "\n"
 
         # Separator line before following output
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index ac52b0e..7a85f69 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -74,6 +74,7 @@ class NotificationType(FastEnum):
     ELEMENT_TOTALS = "element_totals"
     FINISH = "finish"
     SIGTSTP = "sigstp"
+    SHOW_PIPELINE = "show_pipeline"
 
 
 # Notification()
@@ -98,7 +99,8 @@ class Notification:
         task_error=None,
         exception=None,
         task_groups=None,
-        element_totals=None
+        element_totals=None,
+        show_pipeline=None
     ):
         self.notification_type = notification_type
         self.full_name = full_name
@@ -111,6 +113,7 @@ class Notification:
         self.exception = exception
         self.task_groups = task_groups  # Tuple of queue name, complete name, task change, & optional element name
         self.element_totals = element_totals
+        self.show_pipeline = show_pipeline  # Output of LogLine.show_pipeline() cb, to represent pipeline state
 
 
 # Scheduler()
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 789e14a..550fa0f 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -94,6 +94,7 @@ class Stream:
         self.len_session_elements = ""
         self.len_total_elements = ""
         self.loop = None
+        self.total_pipeline_render = None
 
         #
         # Private members
@@ -121,6 +122,7 @@ class Stream:
         self._notify_back_queue = None
         self._casd_process = None
         self._watcher = None
+        self._pipeline_render_callback = None
 
     # init()
     #
@@ -1485,6 +1487,12 @@ class Stream:
         element_totals = str(len(self.session_elements)), str(len(self.total_elements))
         self._notify_front(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals))
 
+        # Also send through the pipeline renderer output for heading & summary rendering
+        total_pipeline_render = self._pipeline_render_callback(  # pylint: disable=not-callable
+            self.total_elements, self._context.log_element_format
+        )
+        self._notify_front(Notification(NotificationType.SHOW_PIPELINE, show_pipeline=total_pipeline_render))
+
         if self._session_start_callback is not None:
             self._notify_front(Notification(NotificationType.START))
 
@@ -1818,6 +1826,8 @@ class Stream:
             self._session_start_callback()
         elif notification.notification_type == NotificationType.ELEMENT_TOTALS:
             self.len_session_elements, self.len_total_elements = notification.element_totals
+        elif notification.notification_type == NotificationType.SHOW_PIPELINE:
+            self.total_pipeline_render = notification.show_pipeline
         elif notification.notification_type == NotificationType.FINISH:
             if self.loop:
                 self.loop.stop()