You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:31:02 UTC

[buildstream] 08/19: Send scheduler notifications over a multiprocessing queue

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/ui-split-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit e39c94064de749d31bf41987fccaa19fc129a975
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Thu Jun 20 10:18:49 2019 +0100

    Send scheduler notifications over a multiprocessing queue
---
 src/buildstream/_scheduler/scheduler.py | 17 ++++++++++-------
 src/buildstream/_stream.py              |  4 +++-
 2 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 9e120e4..14ecf30 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -98,7 +98,7 @@ class Notification:
 class Scheduler():
 
     def __init__(self, context,
-                 start_time, state, notification_handler,
+                 start_time, state, notification_queue,
                  interrupt_callback=None,
                  ticker_callback=None,
                  interactive_failure=False):
@@ -131,8 +131,8 @@ class Scheduler():
         self._cleanup_scheduled = False       # Whether we have a cleanup job scheduled
         self._cleanup_running = None          # A running CleanupJob, or None
 
-        # Callback to send notifications to report back to the Scheduler's owner
-        self.notify = notification_handler
+        # Message to send notifications back to the Scheduler's owner
+        self._notification_queue = notification_queue
 
         # Whether our exclusive jobs, like 'cleanup' are currently already
         # waiting or active.
@@ -305,7 +305,7 @@ class Scheduler():
                                     job_status=status,
                                     failed_element=job.element_job,
                                     element=element)
-        self.notify(notification)
+        self._notify(notification)
 
         if process_jobs:
             # Now check for more jobs
@@ -369,7 +369,7 @@ class Scheduler():
                                     full_name=job.name,
                                     job_action=job.action_name,
                                     elapsed_time=self.elapsed_time())
-        self.notify(notification)
+        self._notify(notification)
         job.start()
 
     # Callback for the cache size job
@@ -589,7 +589,7 @@ class Scheduler():
             return
 
         notification = Notification(NotificationType.INTERRUPT)
-        self.notify(notification)
+        self._notify(notification)
 
     # _terminate_event():
     #
@@ -649,9 +649,12 @@ class Scheduler():
     # Regular timeout for driving status in the UI
     def _tick(self):
         notification = Notification(NotificationType.TICK)
-        self.notify(notification)
+        self._notify(notification)
         self.loop.call_later(1, self._tick)
 
+    def _notify(self, notification):
+        self._notification_queue.put(notification)
+
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing
         # are enabling the 'spawn' method of starting child processes, and
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 45bb41b..2d451c6 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -21,6 +21,7 @@
 
 import itertools
 import functools
+import multiprocessing as mp
 import os
 import sys
 import stat
@@ -76,6 +77,7 @@ class Stream():
         #
         # Private members
         #
+        self._notification_queue = mp.Queue()
         self._context = context
         self._artifacts = None
         self._sourcecache = None
@@ -85,7 +87,7 @@ class Stream():
 
         context.messenger.set_state(self._state)
 
-        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler,
+        self._scheduler = Scheduler(context, session_start, self._state, self._notification_queue,
                                     interrupt_callback=interrupt_callback,
                                     ticker_callback=ticker_callback,
                                     interactive_failure=interactive_failure)