You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:23:29 UTC

[buildstream] 02/02: scheduler.py: Prioritise jobs from later queues

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/712-_sched-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit fa8fb519a6568bf9ae09991cb1767c9bfc92a160
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Dec 7 15:02:11 2018 +0000

    scheduler.py: Prioritise jobs from later queues
    
    Previously, we were reversing the order of queues in
    _schedule_queue_jobs() in an attempt to avoid resource starvation of
    queues which share resource types. This was incorrect as the earlier
    jobs were still ready first and so were scheduled first.
    
    Instead we schedule jobs starting from the most recently ready. This
    will mean that queues later in the scheduling process will always have
    priority when they share a resource type with another queue.
    
    This also fixes a bug in _sched() arising from the fact we were
    removing items from waiting_jobs while iterating through it.
    Aside from resulting in the loop having an O(n^2) complexity, this
    results in skipping the job following the removed item in the
    iteration.
    
    This commit is related to issue #712
---
 buildstream/_scheduler/scheduler.py | 31 ++++++++++++++-----------------
 1 file changed, 14 insertions(+), 17 deletions(-)

diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 0fe9962..3bad5bf 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -221,8 +221,7 @@ class Scheduler():
     # resources are available for them to run
     #
     def schedule_jobs(self, jobs):
-        for job in jobs:
-            self._waiting_jobs.append(job)
+            self._waiting_jobs.extend(jobs)
 
     # job_completed():
     #
@@ -270,20 +269,28 @@ class Scheduler():
     #
     def _sched(self):
 
-        for job in self._waiting_jobs:
+        uninitiated_jobs = []
+
+        # Iterate the list backwards because we want to give priority
+        # to queues later in the scheduling process when multiple
+        # queues share the same token type.
+        for job in reversed(self._waiting_jobs):
             self._resources.reserve_exclusive_resources(job)
 
-        for job in self._waiting_jobs:
+        for job in reversed(self._waiting_jobs):
             if not self._resources.reserve_job_resources(job):
+                uninitiated_jobs.append(job)
                 continue
 
             job.spawn()
-            self._waiting_jobs.remove(job)
             self.active_jobs.append(job)
 
             if self._job_start_callback:
                 self._job_start_callback(job)
 
+        uninitiated_jobs.reverse()
+        self._waiting_jobs = uninitiated_jobs
+
         # If nothings ticking, time to bail out
         if not self.active_jobs and not self._waiting_jobs:
             self.loop.stop()
@@ -312,19 +319,9 @@ class Scheduler():
                 # Dequeue processed elements for the next queue
                 elements = list(queue.dequeue())
 
-            # Kickoff whatever processes can be processed at this time
-            #
-            # We start by queuing from the last queue first, because
-            # we want to give priority to queues later in the
-            # scheduling process in the case that multiple queues
-            # share the same token type.
-            #
-            # This avoids starvation situations where we dont move on
-            # to fetch tasks for elements which failed to pull, and
-            # thus need all the pulls to complete before ever starting
-            # a build
+            # Get the jobs which have had their prerequisites met.
             ready.extend(chain.from_iterable(
-                queue.pop_ready_jobs() for queue in reversed(self.queues)
+                queue.pop_ready_jobs() for queue in self.queues
             ))
 
             # pop_ready_jobs() may have skipped jobs, adding them to