You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:23:27 UTC
[buildstream] branch phil/712-_sched-refactor created (now fa8fb51)
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a change to branch phil/712-_sched-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git.
at fa8fb51 scheduler.py: Prioritise jobs from later queues
This branch includes the following new commits:
new bdf9c14 scheduler.py: make waiting_jobs private
new fa8fb51 scheduler.py: Prioritise jobs from later queues
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[buildstream] 01/02: scheduler.py: make waiting_jobs private
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/712-_sched-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit bdf9c14a2cee47daef35200f72f94f42c945d2d1
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Dec 7 13:47:28 2018 +0000
scheduler.py: make waiting_jobs private
---
buildstream/_scheduler/scheduler.py | 15 ++++++++-------
1 file changed, 8 insertions(+), 7 deletions(-)
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index b76c730..0fe9962 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -72,7 +72,6 @@ class Scheduler():
# Public members
#
self.active_jobs = [] # Jobs currently being run in the scheduler
- self.waiting_jobs = [] # Jobs waiting for resources
self.queues = None # Exposed for the frontend to print summaries
self.context = context # The Context object shared with Queues
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
@@ -85,6 +84,7 @@ class Scheduler():
#
# Private members
#
+ self._waiting_jobs = [] # Jobs waiting for resources
self._interrupt_callback = interrupt_callback
self._ticker_callback = ticker_callback
self._job_start_callback = job_start_callback
@@ -222,7 +222,7 @@ class Scheduler():
#
def schedule_jobs(self, jobs):
for job in jobs:
- self.waiting_jobs.append(job)
+ self._waiting_jobs.append(job)
# job_completed():
#
@@ -269,22 +269,23 @@ class Scheduler():
# automatically when Scheduler.run() is called initially,
#
def _sched(self):
- for job in self.waiting_jobs:
+
+ for job in self._waiting_jobs:
self._resources.reserve_exclusive_resources(job)
- for job in self.waiting_jobs:
+ for job in self._waiting_jobs:
if not self._resources.reserve_job_resources(job):
continue
job.spawn()
- self.waiting_jobs.remove(job)
+ self._waiting_jobs.remove(job)
self.active_jobs.append(job)
if self._job_start_callback:
self._job_start_callback(job)
# If nothings ticking, time to bail out
- if not self.active_jobs and not self.waiting_jobs:
+ if not self.active_jobs and not self._waiting_jobs:
self.loop.stop()
# _schedule_queue_jobs()
@@ -460,7 +461,7 @@ class Scheduler():
job.kill()
# Clear out the waiting jobs
- self.waiting_jobs = []
+ self._waiting_jobs = []
# Regular timeout for driving status in the UI
def _tick(self):
[buildstream] 02/02: scheduler.py: Prioritise jobs from later queues
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch phil/712-_sched-refactor
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit fa8fb519a6568bf9ae09991cb1767c9bfc92a160
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Fri Dec 7 15:02:11 2018 +0000
scheduler.py: Prioritise jobs from later queues
Previously, we were reversing the order of queues in
_schedule_queue_jobs() in an attempt to avoid resource starvation of
queues which share resource types. This was incorrect as the earlier
jobs were still ready first and so were scheduled first.
Instead we schedule jobs starting from the most recently ready. This
will mean that queues later in the scheduling process will always have
priority when they share a resource type with another queue.
This also fixes a bug in _sched() arising from the fact we were
removing items from waiting_jobs while iterating through it.
Aside from resulting in the loop having an O(n^2) complexity, this
results in skipping the job following the removed item in the
iteration.
This commit is related to issue #712
---
buildstream/_scheduler/scheduler.py | 31 ++++++++++++++-----------------
1 file changed, 14 insertions(+), 17 deletions(-)
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 0fe9962..3bad5bf 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -221,8 +221,7 @@ class Scheduler():
# resources are available for them to run
#
def schedule_jobs(self, jobs):
- for job in jobs:
- self._waiting_jobs.append(job)
+ self._waiting_jobs.extend(jobs)
# job_completed():
#
@@ -270,20 +269,28 @@ class Scheduler():
#
def _sched(self):
- for job in self._waiting_jobs:
+ uninitiated_jobs = []
+
+ # Iterate the list backwards because we want to give priority
+ # to queues later in the scheduling process when multiple
+ # queues share the same token type.
+ for job in reversed(self._waiting_jobs):
self._resources.reserve_exclusive_resources(job)
- for job in self._waiting_jobs:
+ for job in reversed(self._waiting_jobs):
if not self._resources.reserve_job_resources(job):
+ uninitiated_jobs.append(job)
continue
job.spawn()
- self._waiting_jobs.remove(job)
self.active_jobs.append(job)
if self._job_start_callback:
self._job_start_callback(job)
+ uninitiated_jobs.reverse()
+ self._waiting_jobs = uninitiated_jobs
+
# If nothings ticking, time to bail out
if not self.active_jobs and not self._waiting_jobs:
self.loop.stop()
@@ -312,19 +319,9 @@ class Scheduler():
# Dequeue processed elements for the next queue
elements = list(queue.dequeue())
- # Kickoff whatever processes can be processed at this time
- #
- # We start by queuing from the last queue first, because
- # we want to give priority to queues later in the
- # scheduling process in the case that multiple queues
- # share the same token type.
- #
- # This avoids starvation situations where we dont move on
- # to fetch tasks for elements which failed to pull, and
- # thus need all the pulls to complete before ever starting
- # a build
+ # Get the jobs which have had their prerequisites met.
ready.extend(chain.from_iterable(
- queue.pop_ready_jobs() for queue in reversed(self.queues)
+ queue.pop_ready_jobs() for queue in self.queues
))
# pop_ready_jobs() may have skipped jobs, adding them to