You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 07:36:21 UTC

[buildstream] 08/16: Move sched notification poll to loop reader

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tpollard/buildsubtemp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit f0f29863751531d43db3626ffc2cc06b0845dc5b
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Oct 2 11:30:10 2019 +0100

    Move sched notification poll to loop reader
---
 src/buildstream/_scheduler/scheduler.py | 32 +++++++++++++++++---------------
 1 file changed, 17 insertions(+), 15 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index a17d7d2..a0a87fb 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -202,16 +202,17 @@ class Scheduler():
         self._casd_process = casd_process
         _watcher = asyncio.get_child_watcher()
         _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
-        
-        # Add notification handler
-        if self._notify_back_queue:
-            self.loop.call_later(0.01, self._loop)
+
+        # Add notification listener if in subprocess
+        self._start_listening()
 
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
             # Run the queues
             self._sched()
             self.loop.run_forever()
+            # Stop listening for notifications
+            self._stop_listening()
             self.loop.close()
 
         # Stop watching casd
@@ -386,7 +387,7 @@ class Scheduler():
     #
     def _abort_on_casd_failure(self, pid, returncode):
         message = Message(MessageType.BUG, "buildbox-casd died while the pipeline was active.")
-        self._notify(Notification(NotificationType.MESSAGE, message=message))
+        self._notify_front(Notification(NotificationType.MESSAGE, message=message))
 
         self._casd_process.returncode = returncode
         self.terminate_jobs()
@@ -626,16 +627,17 @@ class Scheduler():
             raise ValueError("Unrecognised notification type received")
 
     def _loop(self):
-        assert self._notify_back_queue
-        # Check for and process new messages
-        while True:
-            try:
-                notification = self._notify_back_queue.get_nowait()
-                self._notification_handler(notification)
-            except queue.Empty:
-                notification = None
-                break
-        self.loop.call_later(0.01, self._loop)
+        while not self._notify_back_queue.empty():
+            notification = self._notify_back_queue.get_nowait()
+            self._notification_handler(notification)
+
+    def _start_listening(self):
+        if self._notify_back_queue:
+            self.loop.add_reader(self._notify_back_queue._reader.fileno(), self._loop)
+
+    def _stop_listening(self):
+        if self._notify_back_queue:
+            self.loop.remove_reader(self._notify_back_queue._reader.fileno())
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing