You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:23:27 UTC

[buildstream] branch phil/712-_sched-refactor created (now fa8fb51)

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a change to branch phil/712-_sched-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at fa8fb51  scheduler.py: Prioritise jobs from later queues

This branch includes the following new commits:

     new bdf9c14  scheduler.py: make waiting_jobs private
     new fa8fb51  scheduler.py: Prioritise jobs from later queues

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 01/02: scheduler.py: make waiting_jobs private

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/712-_sched-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit bdf9c14a2cee47daef35200f72f94f42c945d2d1
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Dec 7 13:47:28 2018 +0000

    scheduler.py: make waiting_jobs private
---
 buildstream/_scheduler/scheduler.py | 15 ++++++++-------
 1 file changed, 8 insertions(+), 7 deletions(-)

diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index b76c730..0fe9962 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -72,7 +72,6 @@ class Scheduler():
         # Public members
         #
         self.active_jobs = []       # Jobs currently being run in the scheduler
-        self.waiting_jobs = []      # Jobs waiting for resources
         self.queues = None          # Exposed for the frontend to print summaries
         self.context = context      # The Context object shared with Queues
         self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
@@ -85,6 +84,7 @@ class Scheduler():
         #
         # Private members
         #
+        self._waiting_jobs = []      # Jobs waiting for resources
         self._interrupt_callback = interrupt_callback
         self._ticker_callback = ticker_callback
         self._job_start_callback = job_start_callback
@@ -222,7 +222,7 @@ class Scheduler():
     #
     def schedule_jobs(self, jobs):
         for job in jobs:
-            self.waiting_jobs.append(job)
+            self._waiting_jobs.append(job)
 
     # job_completed():
     #
@@ -269,22 +269,23 @@ class Scheduler():
     # automatically when Scheduler.run() is called initially,
     #
     def _sched(self):
-        for job in self.waiting_jobs:
+
+        for job in self._waiting_jobs:
             self._resources.reserve_exclusive_resources(job)
 
-        for job in self.waiting_jobs:
+        for job in self._waiting_jobs:
             if not self._resources.reserve_job_resources(job):
                 continue
 
             job.spawn()
-            self.waiting_jobs.remove(job)
+            self._waiting_jobs.remove(job)
             self.active_jobs.append(job)
 
             if self._job_start_callback:
                 self._job_start_callback(job)
 
         # If nothings ticking, time to bail out
-        if not self.active_jobs and not self.waiting_jobs:
+        if not self.active_jobs and not self._waiting_jobs:
             self.loop.stop()
 
     # _schedule_queue_jobs()
@@ -460,7 +461,7 @@ class Scheduler():
                 job.kill()
 
         # Clear out the waiting jobs
-        self.waiting_jobs = []
+        self._waiting_jobs = []
 
     # Regular timeout for driving status in the UI
     def _tick(self):


[buildstream] 02/02: scheduler.py: Prioritise jobs from later queues

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch phil/712-_sched-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit fa8fb519a6568bf9ae09991cb1767c9bfc92a160
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Dec 7 15:02:11 2018 +0000

    scheduler.py: Prioritise jobs from later queues
    
    Previously, we were reversing the order of queues in
    _schedule_queue_jobs() in an attempt to avoid resource starvation of
    queues which share resource types. This was incorrect as the earlier
    jobs were still ready first and so were scheduled first.
    
    Instead we schedule jobs starting from the most recently ready. This
    will mean that queues later in the scheduling process will always have
    priority when they share a resource type with another queue.
    
    This also fixes a bug in _sched() arising from the fact we were
    removing items from waiting_jobs while iterating through it.
    Aside from resulting in the loop having an O(n^2) complexity, this
    results in skipping the job following the removed item in the
    iteration.
    
    This commit is related to issue #712
---
 buildstream/_scheduler/scheduler.py | 31 ++++++++++++++-----------------
 1 file changed, 14 insertions(+), 17 deletions(-)

diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 0fe9962..3bad5bf 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -221,8 +221,7 @@ class Scheduler():
     # resources are available for them to run
     #
     def schedule_jobs(self, jobs):
-        for job in jobs:
-            self._waiting_jobs.append(job)
+            self._waiting_jobs.extend(jobs)
 
     # job_completed():
     #
@@ -270,20 +269,28 @@ class Scheduler():
     #
     def _sched(self):
 
-        for job in self._waiting_jobs:
+        uninitiated_jobs = []
+
+        # Iterate the list backwards because we want to give priority
+        # to queues later in the scheduling process when multiple
+        # queues share the same token type.
+        for job in reversed(self._waiting_jobs):
             self._resources.reserve_exclusive_resources(job)
 
-        for job in self._waiting_jobs:
+        for job in reversed(self._waiting_jobs):
             if not self._resources.reserve_job_resources(job):
+                uninitiated_jobs.append(job)
                 continue
 
             job.spawn()
-            self._waiting_jobs.remove(job)
             self.active_jobs.append(job)
 
             if self._job_start_callback:
                 self._job_start_callback(job)
 
+        uninitiated_jobs.reverse()
+        self._waiting_jobs = uninitiated_jobs
+
         # If nothings ticking, time to bail out
         if not self.active_jobs and not self._waiting_jobs:
             self.loop.stop()
@@ -312,19 +319,9 @@ class Scheduler():
                 # Dequeue processed elements for the next queue
                 elements = list(queue.dequeue())
 
-            # Kickoff whatever processes can be processed at this time
-            #
-            # We start by queuing from the last queue first, because
-            # we want to give priority to queues later in the
-            # scheduling process in the case that multiple queues
-            # share the same token type.
-            #
-            # This avoids starvation situations where we dont move on
-            # to fetch tasks for elements which failed to pull, and
-            # thus need all the pulls to complete before ever starting
-            # a build
+            # Get the jobs which have had their prerequisites met.
             ready.extend(chain.from_iterable(
-                queue.pop_ready_jobs() for queue in reversed(self.queues)
+                queue.pop_ready_jobs() for queue in self.queues
             ))
 
             # pop_ready_jobs() may have skipped jobs, adding them to