You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:32:55 UTC
[buildstream] 03/04: WIP: _schedular: introduce second 'high
priority' waiting jobs queue
This is an automated email from the ASF dual-hosted git repository.
root pushed a commit to branch phil/712
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 391bda38013bfee52bb39c57588e66c536056b7d
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Tue Nov 27 15:20:28 2018 +0000
WIP: _schedular: introduce second 'high priority' waiting jobs queue
This reverts commit b23e5d16aa0eb5e0ba41fddc41bc9d29b1cad8dc.
---
buildstream/_scheduler/queues/queue.py | 1 -
buildstream/_scheduler/scheduler.py | 38 ++++++++++++++--------------------
2 files changed, 16 insertions(+), 23 deletions(-)
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 7df3bb1..909cebb 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -58,7 +58,6 @@ class Queue():
action_name = None
complete_name = None
resources = [] # Resources this queues' jobs want
- high_priority = False # If the jobs from this queue should be prioritised by the scheduler
def __init__(self, scheduler):
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index cfaa0a2..b76c730 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -25,7 +25,6 @@ from itertools import chain
import signal
import datetime
from contextlib import contextmanager
-from collections import deque
# Local imports
from .resources import Resources, ResourceType
@@ -72,16 +71,16 @@ class Scheduler():
#
# Public members
#
- self.active_jobs = [] # Jobs currently being run in the scheduler
- self.waiting_jobs = deque() # Jobs waiting for resources
- self.queues = None # Exposed for the frontend to print summaries
- self.context = context # The Context object shared with Queues
- self.terminated = False # Whether the scheduler was asked to terminate or has terminated
- self.suspended = False # Whether the scheduler is currently suspended
+ self.active_jobs = [] # Jobs currently being run in the scheduler
+ self.waiting_jobs = [] # Jobs waiting for resources
+ self.queues = None # Exposed for the frontend to print summaries
+ self.context = context # The Context object shared with Queues
+ self.terminated = False # Whether the scheduler was asked to terminate or has terminated
+ self.suspended = False # Whether the scheduler is currently suspended
# These are shared with the Job, but should probably be removed or made private in some way.
- self.loop = None # Shared for Job access to observe the message queue
- self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py
+ self.loop = None # Shared for Job access to observe the message queue
+ self.internal_stops = 0 # Amount of SIGSTP signals we've introduced, this is shared with job.py
#
# Private members
@@ -216,15 +215,14 @@ class Scheduler():
#
# Args:
# jobs ([Job]): A list of jobs to schedule
- # priority_jobs([Job]): A list of jobs which should be prioritised over those in jobs
#
# Schedule 'Job's for the scheduler to run. Jobs scheduled will be
# run as soon any other queueing jobs finish, provided sufficient
- # resources are available for them to run.
+ # resources are available for them to run
#
- def schedule_jobs(self, jobs, priority_jobs):
- self.waiting_jobs.extend(jobs)
- self.waiting_jobs.extendleft(priority_jobs)
+ def schedule_jobs(self, jobs):
+ for job in jobs:
+ self.waiting_jobs.append(job)
# job_completed():
#
@@ -300,7 +298,6 @@ class Scheduler():
#
def _schedule_queue_jobs(self):
ready = []
- ready_priority = []
process_queues = True
while self._queue_jobs and process_queues:
@@ -325,19 +322,16 @@ class Scheduler():
# to fetch tasks for elements which failed to pull, and
# thus need all the pulls to complete before ever starting
# a build
- for queue in reversed(self.queues):
- ready_jobs = queue.pop_ready_jobs()
- if queue.high_priority:
- ready_priority.extend(ready_jobs)
- else:
- ready.extend(ready_jobs)
+ ready.extend(chain.from_iterable(
+ queue.pop_ready_jobs() for queue in reversed(self.queues)
+ ))
# pop_ready_jobs() may have skipped jobs, adding them to
# the done_queue. Pull these skipped elements forward to
# the next queue and process them.
process_queues = any(q.dequeue_ready() for q in self.queues)
- self.schedule_jobs(ready, ready_priority)
+ self.schedule_jobs(ready)
self._sched()
# _run_cleanup()