You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 08:18:30 UTC

[buildstream] 08/19: Make it more verbose with front & back notifications

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tpollard/temp
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit bf1741582db3316e00d980e8b012b5b35d64d635
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Sep 27 17:46:46 2019 +0100

    Make it more verbose with front & back notifications
---
 src/buildstream/_scheduler/scheduler.py | 48 +++++++++++------------
 src/buildstream/_stream.py              | 67 ++++++++++++++++-----------------
 2 files changed, 56 insertions(+), 59 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 6b5f306..3476162 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -157,8 +157,8 @@ class Scheduler:
         self._casd_process = None  # handle to the casd process for monitoring purpose
 
         # Bidirectional pipe to send notifications back to the Scheduler's owner
-        self._notify_front = None
-        self._notify_back = None
+        self._notify_front_queue = None
+        self._notify_back_queue = None
         # Notifier callback to use if not running in a subprocess
         self._notifier = notifier
 
@@ -190,7 +190,7 @@ class Scheduler:
         asyncio.set_event_loop(self.loop)
 
         # Notify that the loop has been created
-        self._notify(Notification(NotificationType.RUNNING))
+        self._notify_front(Notification(NotificationType.RUNNING))
 
         # Add timeouts
         self.loop.call_later(1, self._tick)
@@ -204,7 +204,7 @@ class Scheduler:
         _watcher.add_child_handler(casd_process.pid, self._abort_on_casd_failure)
 
         # Add notification handler
-        if self._notify_back:
+        if self._notify_back_queue:
             self.loop.call_later(0.01, self._loop)
 
         # Start the profiler
@@ -225,7 +225,7 @@ class Scheduler:
         self.loop = None
 
         # Notify that the loop has been reset
-        self._notify(Notification(NotificationType.RUNNING))
+        self._notify_front(Notification(NotificationType.RUNNING))
 
         if failed:
             status = SchedStatus.ERROR
@@ -235,12 +235,12 @@ class Scheduler:
             status = SchedStatus.SUCCESS
 
         # Send the state taskgroups if we're running under the subprocess
-        if self._notify_front:
+        if self._notify_front_queue:
             # Don't pickle state
             for group in self._state.task_groups.values():
                 group._state = None
             notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
-            self._notify_front.put(notification)
+            self._notify_front_queue.put(notification)
 
         return status
 
@@ -279,7 +279,7 @@ class Scheduler:
 
         # Notify the frontend that we're terminated as it might be
         # from an interactive prompt callback or SIGTERM
-        self._notify(Notification(NotificationType.TERMINATED))
+        self._notify_front(Notification(NotificationType.TERMINATED))
         self.loop.call_soon(self._terminate_jobs_real)
 
         # Block this until we're finished terminating jobs,
@@ -342,7 +342,7 @@ class Scheduler:
             job_status=status,
             element=element_info,
         )
-        self._notify(notification)
+        self._notify_front(notification)
         self._sched()
 
     # notify_messenger()
@@ -354,7 +354,7 @@ class Scheduler:
     #                       handler, as assigned by context's messenger.
     #
     def notify_messenger(self, message):
-        self._notify(Notification(NotificationType.MESSAGE, message=message))
+        self._notify_front(Notification(NotificationType.MESSAGE, message=message))
 
     # set_last_task_error()
     #
@@ -368,7 +368,7 @@ class Scheduler:
     def set_last_task_error(self, domain, reason):
         task_error = domain, reason
         notification = Notification(NotificationType.TASK_ERROR, task_error=task_error)
-        self._notify(notification)
+        self._notify_front(notification)
 
     #######################################################
     #                  Local Private Methods              #
@@ -407,7 +407,7 @@ class Scheduler:
             job_action=job.action_name,
             time=self._state.elapsed_time(start_time=self._starttime),
         )
-        self._notify(notification)
+        self._notify_front(notification)
         job.start()
 
     # _sched_queue_jobs()
@@ -497,7 +497,7 @@ class Scheduler:
             self._suspendtime = datetime.datetime.now()
             self.suspended = True
             # Notify that we're suspended
-            self._notify(Notification(NotificationType.SUSPENDED))
+            self._notify_front(Notification(NotificationType.SUSPENDED))
             for job in self._active_jobs:
                 job.suspend()
 
@@ -511,9 +511,9 @@ class Scheduler:
                 job.resume()
             self.suspended = False
             # Notify that we're unsuspended
-            self._notify(Notification(NotificationType.SUSPENDED))
+            self._notify_front(Notification(NotificationType.SUSPENDED))
             self._starttime += datetime.datetime.now() - self._suspendtime
-            self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
+            self._notify_front(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
             self._suspendtime = None
 
     # _interrupt_event():
@@ -529,7 +529,7 @@ class Scheduler:
             return
 
         notification = Notification(NotificationType.INTERRUPT)
-        self._notify(notification)
+        self._notify_front(notification)
 
     # _terminate_event():
     #
@@ -582,7 +582,7 @@ class Scheduler:
 
     # Regular timeout for driving status in the UI
     def _tick(self):
-        self._notify(Notification(NotificationType.TICK))
+        self._notify_front(Notification(NotificationType.TICK))
         self.loop.call_later(1, self._tick)
 
     def _failure_retry(self, action_name, unique_id):
@@ -597,14 +597,14 @@ class Scheduler:
         queue._task_group.failed_tasks.remove(element._get_full_name())
         queue.enqueue([element])
 
-    def _notify(self, notification):
+    def _notify_front(self, notification):
         # Check if we need to call the notifier callback
-        if self._notify_front:
-            self._notify_front.put(notification)
+        if self._notify_front_queue:
+            self._notify_front_queue.put(notification)
         else:
             self._notifier(notification)
 
-    def _stream_notification_handler(self, notification):
+    def _notification_handler(self, notification):
         if notification.notification_type == NotificationType.TERMINATE:
             self.terminate_jobs()
         elif notification.notification_type == NotificationType.QUIT:
@@ -621,12 +621,12 @@ class Scheduler:
             raise ValueError("Unrecognised notification type received")
 
     def _loop(self):
-        assert self._notify_back
+        assert self._notify_back_queue
         # Check for and process new messages
         while True:
             try:
-                notification = self._notify_back.get_nowait()
-                self._stream_notification_handler(notification)
+                notification = self._notify_back_queue.get_nowait()
+                self._notification_handler(notification)
             except queue.Empty:
                 notification = None
                 break
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 841ccde..c8758da 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -106,17 +106,16 @@ class Stream:
 
         context.messenger.set_state(self._state)
 
-        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler)
+        self._scheduler = Scheduler(context, session_start, self._state, self._notification_handler)
         self._first_non_track_queue = None
         self._session_start_callback = session_start_callback
         self._ticker_callback = ticker_callback
         self._interrupt_callback = interrupt_callback
-        self._notifier = self._scheduler._stream_notification_handler  # Assign the schedulers notification handler
         self._scheduler_running = False
         self._scheduler_terminated = False
         self._scheduler_suspended = False
-        self._notify_front = None
-        self._notify_back = None
+        self._notify_front_queue = None
+        self._notify_back_queue = None
 
     # init()
     #
@@ -145,14 +144,14 @@ class Stream:
         mp_context = mp.get_context(method="fork")
         process_name = "stream-{}".format(func.__name__)
 
-        self._notify_front = mp.Queue()
-        self._notify_back = mp.Queue()
+        self._notify_front_queue = mp.Queue()
+        self._notify_back_queue = mp.Queue()
         # Tell the scheduler to not use the notifier callback
-        self._scheduler._notify_front = self._notify_front
-        self._scheduler._notify_back = self._notify_back
+        self._scheduler._notify_front_queue = self._notify_front_queue
+        self._scheduler._notify_back_queue = self._notify_back_queue
 
         args = list(args)
-        args.insert(0, self._notify_front)
+        args.insert(0, self._notify_front_queue)
         args.insert(0, func)
 
         self._subprocess = mp_context.Process(
@@ -172,8 +171,8 @@ class Stream:
         # Ensure no more notifcations to process
         try:
             while True:
-                notification = self._notify_front.get_nowait()
-                self._scheduler_notification_handler(notification)
+                notification = self._notify_front_queue.get_nowait()
+                self._notification_handler(notification)
         except queue.Empty:
             print("Finished processing notifications")
             pass
@@ -184,7 +183,7 @@ class Stream:
     #
     def cleanup(self):
         # Close the notification queue
-        for q in [self._notify_back, self._notify_front]:
+        for q in [self._notify_back_queue, self._notify_front_queue]:
             if q is not None:
                 q.close()
         # self._notification_queue.cancel_join_thread()
@@ -1222,7 +1221,7 @@ class Stream:
     #
     def terminate(self):
         notification = Notification(NotificationType.TERMINATE)
-        self._notify(notification)
+        self._notify_back(notification)
 
     # quit()
     #
@@ -1232,7 +1231,7 @@ class Stream:
     #
     def quit(self):
         notification = Notification(NotificationType.QUIT)
-        self._notify(notification)
+        self._notify_back(notification)
 
     # suspend()
     #
@@ -1242,11 +1241,11 @@ class Stream:
     def suspend(self):
         # Send the notification to suspend jobs
         notification = Notification(NotificationType.SUSPEND)
-        self._notify(notification)
+        self._notify_back(notification)
         yield
         # Unsuspend jobs on context exit
         notification = Notification(NotificationType.UNSUSPEND)
-        self._notify(notification)
+        self._notify_back(notification)
 
     #############################################################
     #                    Private Methods                        #
@@ -1435,7 +1434,7 @@ class Stream:
     #
     def _failure_retry(self, action_name, unique_id):
         notification = Notification(NotificationType.RETRY, job_action=action_name, element=unique_id)
-        self._notify(notification)
+        self._notify_back(notification)
 
     # _run()
     #
@@ -1449,17 +1448,11 @@ class Stream:
         self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
 
         if self._session_start_callback is not None:
-            if self._notify_front:
-                self._notify_front.put(Notification(NotificationType.START))
-            else:
-                self._session_start_callback()
+            self._notify_front(Notification(NotificationType.START))
 
         # Also send through the session & total elements list lengths for status rendering
         element_totals = str(len(self.session_elements)), str(len(self.total_elements))
-        if self._notify_front:
-            self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals))
-        else:
-            self.len_session_elements, self.len_total_elements = element_totals
+        self._notify_front(Notification(NotificationType.ELEMENT_TOTALS, element_totals=element_totals))
 
         status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd_process())
 
@@ -1749,7 +1742,7 @@ class Stream:
 
         return element_targets, artifact_refs
 
-    def _scheduler_notification_handler(self, notification):
+    def _notification_handler(self, notification):
         if notification.notification_type == NotificationType.TASK_GROUPS:
             self._state.task_groups = notification.task_groups
         elif notification.notification_type == NotificationType.MESSAGE:
@@ -1784,23 +1777,27 @@ class Stream:
         else:
             raise StreamError("Unrecognised notification type received")
 
-    def _notify(self, notification):
-        # Set that the notifcation is for the scheduler
-        # notification.for_scheduler = True
-        if self._notify_back:
-            self._notify_back.put(notification)
+    def _notify_back(self, notification):
+        if self._notify_back_queue:
+            self._notify_back_queue.put(notification)
+        else:
+            self._scheduler._notification_handler(notification)
+
+    def _notify_front(self, notification):
+        if self._notify_front_queue:
+            self._notify_front_queue.put(notification)
         else:
-            self._scheduler._stream_notification_handler(notification)
+            self._notification_handler(notification)
 
     # The code to be run by the Stream's event loop while delegating
     # work to a subprocess with the @subprocessed decorator
     def _loop(self):
-        assert self._notify_front
+        assert self._notify_front_queue
         # Check for and process new messages
         while True:
             try:
-                notification = self._notify_front.get_nowait()
-                self._scheduler_notification_handler(notification)
+                notification = self._notify_front_queue.get_nowait()
+                self._notification_handler(notification)
             except queue.Empty:
                 notification = None
                 break