You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2022/08/19 08:11:16 UTC

[buildstream] 01/03: _scheduler: Add concept of imperative queues

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tristan/quit-only-imperative-jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 58564f258f0a2f1cf8b2f20b8ada2748c8b7ba44
Author: Tristan van Berkom <tr...@codethink.co.uk>
AuthorDate: Fri Aug 19 17:07:55 2022 +0900

    _scheduler: Add concept of imperative queues
    
    Impacts the behavior of quitting the scheduler (as opposed to terminating it).
    
    If Scheduler.stop() is called, then we will stop processing jobs from
    the imperative queue or any preceding queues and only continue to process
    post-imperative queues.
    
    Related to #534
---
 src/buildstream/_scheduler/queues/queue.py |  7 ++++++-
 src/buildstream/_scheduler/scheduler.py    | 28 ++++++++++++++++++++++------
 2 files changed, 28 insertions(+), 7 deletions(-)

diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 76688e75d..e9f934d57 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -64,7 +64,7 @@ class Queue:
     # Resources this queues' jobs want
     resources = []  # type: List[int]
 
-    def __init__(self, scheduler):
+    def __init__(self, scheduler, *, imperative=False):
 
         #
         # Private members
@@ -78,6 +78,11 @@ class Queue:
 
         self._required_element_check = False  # Whether we should check that elements are required before enqueuing
 
+        #
+        # Public members
+        #
+        self.imperative = imperative
+
         # Assert the subclass has setup class data
         assert self.action_name is not None
         assert self.complete_name is not None
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 9d8e1c0f3..bba7de703 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -266,8 +266,8 @@ class Scheduler:
 
     # stop()
     #
-    # Stop queueing additional jobs, causes Scheduler.run()
-    # to return once all currently processing jobs are finished.
+    # Stop queueing additional imperative jobs, causes Scheduler.run()
+    # to return once all post-imperative jobs are finished.
     #
     def stop(self):
         self._queue_jobs = False
@@ -354,11 +354,27 @@ class Scheduler:
         ready = []
         process_queues = True
 
-        while self._queue_jobs and process_queues:
+        if self._queue_jobs:
+            # Scheduler.stop() was not called, consider all queues.
+            queues = self.queues
+        else:
+            # Limit processing to post-imperative queues
+            for queue in self.queues:
+                if queue.imperative:
+                    # Here the `queues` list will consists of the imperative queue along with
+                    # any subsequent queues, this means elements will be carried only from the
+                    # imerative queue onwards, and only post-imperative jobs will be processed.
+                    queues = self.queues[self.queues.index(queue) :]
+                    break
+            else:
+                # No imperative queue was marked, stop queueing any jobs
+                queues = []
+
+        while process_queues:
 
             # Pull elements forward through queues
             elements = []
-            for queue in self.queues:
+            for queue in queues:
                 queue.enqueue(elements)
                 elements = list(queue.dequeue())
 
@@ -373,13 +389,13 @@ class Scheduler:
             # to fetch tasks for elements which failed to pull, and
             # thus need all the pulls to complete before ever starting
             # a build
-            ready.extend(chain.from_iterable(q.harvest_jobs() for q in reversed(self.queues)))
+            ready.extend(chain.from_iterable(q.harvest_jobs() for q in reversed(queues)))
 
             # harvest_jobs() may have decided to skip some jobs, making
             # them eligible for promotion to the next queue as a side effect.
             #
             # If that happens, do another round.
-            process_queues = any(q.dequeue_ready() for q in self.queues)
+            process_queues = any(q.dequeue_ready() for q in queues)
 
         # Start the jobs
         #