You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:32:05 UTC

[buildstream] 08/10: Move sched notification poll to loop reader

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/streamasync
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit a58996467542be85ffc1200729bac1b2211b4d41
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Wed Oct 2 11:30:10 2019 +0100

    Move sched notification poll to loop reader
---
 src/buildstream/_scheduler/scheduler.py | 28 +++++++++++++++-------------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index e90efdb..122ba37 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -196,15 +196,16 @@ class Scheduler():
         # Handle unix signals while running
         self._connect_signals()
 
-        # Add notification handler
-        if self._notify_back_queue:
-            self.loop.call_later(0.01, self._loop)
+        # Add notification listener if in subprocess
+        self._start_listening()
 
         # Start the profiler
         with PROFILER.profile(Topics.SCHEDULER, "_".join(queue.action_name for queue in self.queues)):
             # Run the queues
             self._sched()
             self.loop.run_forever()
+            # Stop listening for notifications
+            self._stop_listening()
             self.loop.close()
 
         # Stop handling unix signals
@@ -592,16 +593,17 @@ class Scheduler():
             raise ValueError("Unrecognised notification type received")
 
     def _loop(self):
-        assert self._notify_back_queue
-        # Check for and process new messages
-        while True:
-            try:
-                notification = self._notify_back_queue.get_nowait()
-                self._notification_handler(notification)
-            except queue.Empty:
-                notification = None
-                break
-        self.loop.call_later(0.01, self._loop)
+        while not self._notify_back_queue.empty():
+            notification = self._notify_back_queue.get_nowait()
+            self._notification_handler(notification)
+
+    def _start_listening(self):
+        if self._notify_back_queue:
+            self.loop.add_reader(self._notify_back_queue._reader.fileno(), self._loop)
+
+    def _stop_listening(self):
+        if self._notify_back_queue:
+            self.loop.remove_reader(self._notify_back_queue._reader.fileno())
 
     def __getstate__(self):
         # The only use-cases for pickling in BuildStream at the time of writing