You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:32:04 UTC

[buildstream] 07/10: Make it more verbose with front & back notifications

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tpollard/streamasync
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 676b700301204947e57a0a54a18f580c6c2a5b00
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Fri Sep 27 17:46:46 2019 +0100

    Make it more verbose with front & back notifications
---
 src/buildstream/_scheduler/scheduler.py | 48 +++++++++++------------
 src/buildstream/_stream.py              | 68 ++++++++++++++++-----------------
 2 files changed, 58 insertions(+), 58 deletions(-)

diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index bb3fac5..e90efdb 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -153,8 +153,8 @@ class Scheduler():
         self._state = state
 
         # Bidirectional pipe to send notifications back to the Scheduler's owner
-        self._notify_front = None
-        self._notify_back = None
+        self._notify_front_queue = None
+        self._notify_back_queue = None
         # Notifier callback to use if not running in a subprocess
         self._notifier = notifier
 
@@ -188,7 +188,7 @@ class Scheduler():
         asyncio.set_event_loop(self.loop)
 
         # Notify that the loop has been created
-        self._notify(Notification(NotificationType.RUNNING))
+        self._notify_front(Notification(NotificationType.RUNNING))
 
         # Add timeouts
         self.loop.call_later(1, self._tick)
@@ -197,7 +197,7 @@ class Scheduler():
         self._connect_signals()
 
         # Add notification handler
-        if self._notify_back:
+        if self._notify_back_queue:
             self.loop.call_later(0.01, self._loop)
 
         # Start the profiler
@@ -214,7 +214,7 @@ class Scheduler():
         self.loop = None
 
         # Notify that the loop has been reset
-        self._notify(Notification(NotificationType.RUNNING))
+        self._notify_front(Notification(NotificationType.RUNNING))
 
         if failed:
             status = SchedStatus.ERROR
@@ -224,12 +224,12 @@ class Scheduler():
             status = SchedStatus.SUCCESS
 
         # Send the state taskgroups if we're running under the subprocess
-        if self._notify_front:
+        if self._notify_front_queue:
             # Don't pickle state
             for group in self._state.task_groups.values():
                 group._state = None
             notification = Notification(NotificationType.TASK_GROUPS, task_groups=self._state.task_groups)
-            self._notify_front.put(notification)
+            self._notify_front_queue.put(notification)
 
         return status
 
@@ -268,7 +268,7 @@ class Scheduler():
 
         # Notify the frontend that we're terminated as it might be
         # from an interactive prompt callback or SIGTERM
-        self._notify(Notification(NotificationType.TERMINATED))
+        self._notify_front(Notification(NotificationType.TERMINATED))
         self.loop.call_soon(self._terminate_jobs_real)
 
         # Block this until we're finished terminating jobs,
@@ -329,7 +329,7 @@ class Scheduler():
                                     job_action=job.action_name,
                                     job_status=status,
                                     element=element_info)
-        self._notify(notification)
+        self._notify_front(notification)
         self._sched()
 
     # notify_messenger()
@@ -341,7 +341,7 @@ class Scheduler():
     #                       handler, as assigned by context's messenger.
     #
     def notify_messenger(self, message):
-        self._notify(Notification(NotificationType.MESSAGE, message=message))
+        self._notify_front(Notification(NotificationType.MESSAGE, message=message))
 
     # set_last_task_error()
     #
@@ -356,7 +356,7 @@ class Scheduler():
         task_error = domain, reason
         notification = Notification(NotificationType.TASK_ERROR,
                                     task_error=task_error)
-        self._notify(notification)
+        self._notify_front(notification)
 
     #######################################################
     #                  Local Private Methods              #
@@ -375,7 +375,7 @@ class Scheduler():
                                     full_name=job.name,
                                     job_action=job.action_name,
                                     time=self._state.elapsed_time(start_time=self._starttime))
-        self._notify(notification)
+        self._notify_front(notification)
         job.start()
 
     # _sched_queue_jobs()
@@ -460,7 +460,7 @@ class Scheduler():
             self._suspendtime = datetime.datetime.now()
             self.suspended = True
             # Notify that we're suspended
-            self._notify(Notification(NotificationType.SUSPENDED))
+            self._notify_front(Notification(NotificationType.SUSPENDED))
             for job in self._active_jobs:
                 job.suspend()
 
@@ -474,9 +474,9 @@ class Scheduler():
                 job.resume()
             self.suspended = False
             # Notify that we're unsuspended
-            self._notify(Notification(NotificationType.SUSPENDED))
+            self._notify_front(Notification(NotificationType.SUSPENDED))
             self._starttime += (datetime.datetime.now() - self._suspendtime)
-            self._notify(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
+            self._notify_front(Notification(NotificationType.SCHED_START_TIME, time=self._starttime))
             self._suspendtime = None
 
     # _interrupt_event():
@@ -494,7 +494,7 @@ class Scheduler():
             return
 
         notification = Notification(NotificationType.INTERRUPT)
-        self._notify(notification)
+        self._notify_front(notification)
 
     # _terminate_event():
     #
@@ -553,7 +553,7 @@ class Scheduler():
 
     # Regular timeout for driving status in the UI
     def _tick(self):
-        self._notify(Notification(NotificationType.TICK))
+        self._notify_front(Notification(NotificationType.TICK))
         self.loop.call_later(1, self._tick)
 
     def _failure_retry(self, action_name, unique_id):
@@ -568,14 +568,14 @@ class Scheduler():
         queue._task_group.failed_tasks.remove(element._get_full_name())
         queue.enqueue([element])
 
-    def _notify(self, notification):
+    def _notify_front(self, notification):
         # Check if we need to call the notifier callback
-        if self._notify_front:
-            self._notify_front.put(notification)
+        if self._notify_front_queue:
+            self._notify_front_queue.put(notification)
         else:
             self._notifier(notification)
 
-    def _stream_notification_handler(self, notification):
+    def _notification_handler(self, notification):
         if notification.notification_type == NotificationType.TERMINATE:
             self.terminate_jobs()
         elif notification.notification_type == NotificationType.QUIT:
@@ -592,12 +592,12 @@ class Scheduler():
             raise ValueError("Unrecognised notification type received")
 
     def _loop(self):
-        assert self._notify_back
+        assert self._notify_back_queue
         # Check for and process new messages
         while True:
             try:
-                notification = self._notify_back.get_nowait()
-                self._stream_notification_handler(notification)
+                notification = self._notify_back_queue.get_nowait()
+                self._notification_handler(notification)
             except queue.Empty:
                 notification = None
                 break
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index d01605e..839636c 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -90,18 +90,17 @@ class Stream():
         context.messenger.set_state(self._state)
 
         # Scheduler may use callback for notification depending on whether it's subprocessed
-        self._scheduler = Scheduler(context, session_start, self._state, self._scheduler_notification_handler)
+        self._scheduler = Scheduler(context, session_start, self._state, self._notification_handler)
 
         self._first_non_track_queue = None
         self._session_start_callback = session_start_callback
         self._ticker_callback = ticker_callback
         self._interrupt_callback = interrupt_callback
-        self._notifier = self._scheduler._stream_notification_handler  # Assign the schedulers notification handler
         self._scheduler_running = False
         self._scheduler_terminated = False
         self._scheduler_suspended = False
-        self._notify_front = None
-        self._notify_back = None
+        self._notify_front_queue = None
+        self._notify_back_queue = None
 
     # init()
     #
@@ -129,14 +128,14 @@ class Stream():
         mp_context = mp.get_context(method='fork')
         process_name = "stream-{}".format(func.__name__)
 
-        self._notify_front = mp.Queue()
-        self._notify_back = mp.Queue()
+        self._notify_front_queue = mp.Queue()
+        self._notify_back_queue = mp.Queue()
         # Tell the scheduler to not use the notifier callback
-        self._scheduler._notify_front = self._notify_front
-        self._scheduler._notify_back = self._notify_back
+        self._scheduler._notify_front_queue = self._notify_front_queue
+        self._scheduler._notify_back_queue = self._notify_back_queue
 
         args = list(args)
-        args.insert(0, self._notify_front)
+        args.insert(0, self._notify_front_queue)
         args.insert(0, func)
 
         self._subprocess = mp_context.Process(target=Stream._subprocess_main, args=args,
@@ -157,8 +156,8 @@ class Stream():
         # Ensure no more notifcations to process
         try:
             while True:
-                notification = self._notify_front.get_nowait()
-                self._scheduler_notification_handler(notification)
+                notification = self._notify_front_queue.get_nowait()
+                self._notification_handler(notification)
         except queue.Empty:
             pass
 
@@ -169,7 +168,7 @@ class Stream():
     #
     def cleanup(self):
         # Close the notification queue
-        for q in [self._notify_back, self._notify_front]:
+        for q in [self._notify_back_queue, self._notify_front_queue]:
             if q is not None:
                 q.close
         #self._notification_queue.cancel_join_thread()
@@ -1196,7 +1195,7 @@ class Stream():
     #
     def terminate(self):
         notification = Notification(NotificationType.TERMINATE)
-        self._notify(notification)
+        self._notify_back(notification)
 
     # quit()
     #
@@ -1206,7 +1205,7 @@ class Stream():
     #
     def quit(self):
         notification = Notification(NotificationType.QUIT)
-        self._notify(notification)
+        self._notify_back(notification)
 
     # suspend()
     #
@@ -1216,11 +1215,11 @@ class Stream():
     def suspend(self):
         # Send the notification to suspend jobs
         notification = Notification(NotificationType.SUSPEND)
-        self._notify(notification)
+        self._notify_back(notification)
         yield
         # Unsuspend jobs on context exit
         notification = Notification(NotificationType.UNSUSPEND)
-        self._notify(notification)
+        self._notify_back(notification)
 
     #############################################################
     #                    Private Methods                        #
@@ -1426,7 +1425,7 @@ class Stream():
         notification = Notification(NotificationType.RETRY,
                                     job_action=action_name,
                                     element=unique_id)
-        self._notify(notification)
+        self._notify_back(notification)
 
     # _run()
     #
@@ -1440,18 +1439,13 @@ class Stream():
         self.total_elements = list(self._pipeline.dependencies(self.targets, Scope.ALL))
 
         if self._session_start_callback is not None:
-            if self._notify_front:
-                self._notify_front.put(Notification(NotificationType.START))
-            else:
-                self._session_start_callback()
+            self._notify_front(Notification(NotificationType.START))
 
         # Also send through the session & total elements list lengths for status rendering
         element_totals = str(len(self.session_elements)), str(len(self.total_elements))
-        if self._notify_front:
-            self._notify_front.put(Notification(NotificationType.ELEMENT_TOTALS,
-                                                element_totals=element_totals))
-        else:
-            self.len_session_elements, self.len_total_elements = element_totals
+        self._notify_front(Notification(NotificationType.ELEMENT_TOTALS,
+                                        element_totals=element_totals))
+
 
         status = self._scheduler.run(self.queues)
 
@@ -1745,7 +1739,7 @@ class Stream():
 
         return element_targets, artifact_refs
 
-    def _scheduler_notification_handler(self, notification):
+    def _notification_handler(self, notification):
         if notification.notification_type == NotificationType.TASK_GROUPS:
             self._state.task_groups = notification.task_groups
         elif notification.notification_type == NotificationType.MESSAGE:
@@ -1782,21 +1776,27 @@ class Stream():
         else:
             raise StreamError("Unrecognised notification type received")
 
-    def _notify(self, notification):
-        if self._notify_back:
-            self._notify_back.put(notification)
+    def _notify_back(self, notification):
+        if self._notify_back_queue:
+            self._notify_back_queue.put(notification)
+        else:
+            self._scheduler._notification_handler(notification)
+
+    def _notify_front(self, notification):
+        if self._notify_front_queue:
+            self._notify_front_queue.put(notification)
         else:
-            self._scheduler._stream_notification_handler(notification)
+            self._notification_handler(notification)
 
     # The code to be run by the Stream's event loop while delegating
     # work to a subprocess with the @subprocessed decorator
     def _loop(self):
-        assert self._notify_front
+        assert self._notify_front_queue
         # Check for and process new messages
         while True:
             try:
-                notification = self._notify_front.get_nowait()
-                self._scheduler_notification_handler(notification)
+                notification = self._notify_front_queue.get_nowait()
+                self._notification_handler(notification)
             except queue.Empty:
                 notification = None
                 break