You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:32:56 UTC

[buildstream] 04/04: fixup! WIP: _schedular: introduce second 'high priority' waiting jobs queue

This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch phil/712
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 3b63a8fc0cab6d94510e8bab5679348a3e644e93
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Tue Nov 27 15:20:28 2018 +0000

    fixup! WIP: _schedular: introduce second 'high priority' waiting jobs queue
---
 buildstream/_scheduler/queues/fetchqueue.py |  1 +
 buildstream/_scheduler/queues/queue.py      |  1 +
 buildstream/_scheduler/scheduler.py         | 58 +++++++++++++++++------------
 3 files changed, 37 insertions(+), 23 deletions(-)

diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index 446dbbd..5c441ba 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -33,6 +33,7 @@ class FetchQueue(Queue):
     action_name = "Fetch"
     complete_name = "Fetched"
     resources = [ResourceType.DOWNLOAD]
+    high_priority = True
 
     def __init__(self, scheduler, skip_cached=False):
         super().__init__(scheduler)
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 909cebb..3c04140 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -58,6 +58,7 @@ class Queue():
     action_name = None
     complete_name = None
     resources = []                     # Resources this queues' jobs want
+    high_priority = False              # If jobs from this queue should be prioritised by the scheduler
 
     def __init__(self, scheduler):
 
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index b76c730..1b05415 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -71,12 +71,13 @@ class Scheduler():
         #
         # Public members
         #
-        self.active_jobs = []       # Jobs currently being run in the scheduler
-        self.waiting_jobs = []      # Jobs waiting for resources
-        self.queues = None          # Exposed for the frontend to print summaries
-        self.context = context      # The Context object shared with Queues
-        self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
-        self.suspended = False      # Whether the scheduler is currently suspended
+        self.active_jobs = []           # Jobs currently being run in the scheduler
+        self.waiting_jobs = []          # Jobs waiting for resources
+        self.waiting_priority_jobs = [] # High priority jobs waiting for resources
+        self.queues = None              # Exposed for the frontend to print summaries
+        self.context = context          # The Context object shared with Queues
+        self.terminated = False         # Whether the scheduler was asked to terminate or has terminated
+        self.suspended = False          # Whether the scheduler is currently suspended
 
         # These are shared with the Job, but should probably be removed or made private in some way.
         self.loop = None            # Shared for Job access to observe the message queue
@@ -220,7 +221,9 @@ class Scheduler():
     # run as soon any other queueing jobs finish, provided sufficient
     # resources are available for them to run
     #
-    def schedule_jobs(self, jobs):
+    def schedule_jobs(self, jobs, priority_jobs):
+        for job in priority_jobs:
+            self.waiting_priority_jobs.append(job)
         for job in jobs:
             self.waiting_jobs.append(job)
 
@@ -257,7 +260,7 @@ class Scheduler():
                            resources=[ResourceType.CACHE,
                                       ResourceType.PROCESS],
                            complete_cb=self._run_cleanup)
-        self.schedule_jobs([job])
+        self.schedule_jobs([job], [])
 
     #######################################################
     #                  Local Private Methods              #
@@ -269,22 +272,27 @@ class Scheduler():
     # automatically when Scheduler.run() is called initially,
     #
     def _sched(self):
-        for job in self.waiting_jobs:
-            self._resources.reserve_exclusive_resources(job)
+        def allocate_resources_and_spawn_jobs(job_list):
+            for job in job_list:
+                self._resources.reserve_exclusive_resources(job)
+
+            for job in job_list:
+                if not self._resources.reserve_job_resources(job):
+                    continue
 
-        for job in self.waiting_jobs:
-            if not self._resources.reserve_job_resources(job):
-                continue
+                job.spawn()
+                job_list.remove(job)
+                self.active_jobs.append(job)
 
-            job.spawn()
-            self.waiting_jobs.remove(job)
-            self.active_jobs.append(job)
+                if self._job_start_callback:
+                    self._job_start_callback(job)
 
-            if self._job_start_callback:
-                self._job_start_callback(job)
+        # Process jobs from the high priority list first
+        allocate_resources_and_spawn_jobs(self.waiting_priority_jobs)
+        allocate_resources_and_spawn_jobs(self.waiting_jobs)
 
         # If nothings ticking, time to bail out
-        if not self.active_jobs and not self.waiting_jobs:
+        if not self.active_jobs and not self.waiting_jobs and not self.waiting_priority_jobs:
             self.loop.stop()
 
     # _schedule_queue_jobs()
@@ -298,6 +306,7 @@ class Scheduler():
     #
     def _schedule_queue_jobs(self):
         ready = []
+        ready_priority = []
         process_queues = True
 
         while self._queue_jobs and process_queues:
@@ -322,16 +331,19 @@ class Scheduler():
             # to fetch tasks for elements which failed to pull, and
             # thus need all the pulls to complete before ever starting
             # a build
-            ready.extend(chain.from_iterable(
-                queue.pop_ready_jobs() for queue in reversed(self.queues)
-            ))
+
+            for queue in reversed(self.queues):
+                if queue.high_priority:
+                    ready_priority.extend(queue.pop_ready_jobs())
+                else:
+                    ready.extend(queue.pop_ready_jobs())
 
             # pop_ready_jobs() may have skipped jobs, adding them to
             # the done_queue.  Pull these skipped elements forward to
             # the next queue and process them.
             process_queues = any(q.dequeue_ready() for q in self.queues)
 
-        self.schedule_jobs(ready)
+        self.schedule_jobs(ready, ready_priority)
         self._sched()
 
     # _run_cleanup()