You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:32:55 UTC

[buildstream] 03/04: WIP: _schedular: introduce second 'high priority' waiting jobs queue

This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch phil/712
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 391bda38013bfee52bb39c57588e66c536056b7d
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Tue Nov 27 15:20:28 2018 +0000

    WIP: _schedular: introduce second 'high priority' waiting jobs queue
    
    This reverts commit b23e5d16aa0eb5e0ba41fddc41bc9d29b1cad8dc.
---
 buildstream/_scheduler/queues/queue.py |  1 -
 buildstream/_scheduler/scheduler.py    | 38 ++++++++++++++--------------------
 2 files changed, 16 insertions(+), 23 deletions(-)

diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 7df3bb1..909cebb 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -58,7 +58,6 @@ class Queue():
     action_name = None
     complete_name = None
     resources = []                     # Resources this queues' jobs want
-    high_priority = False              # If the jobs from this queue should be prioritised by the scheduler
 
     def __init__(self, scheduler):
 
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index cfaa0a2..b76c730 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -25,7 +25,6 @@ from itertools import chain
 import signal
 import datetime
 from contextlib import contextmanager
-from collections import deque
 
 # Local imports
 from .resources import Resources, ResourceType
@@ -72,16 +71,16 @@ class Scheduler():
         #
         # Public members
         #
-        self.active_jobs = []        # Jobs currently being run in the scheduler
-        self.waiting_jobs = deque()  # Jobs waiting for resources
-        self.queues = None           # Exposed for the frontend to print summaries
-        self.context = context       # The Context object shared with Queues
-        self.terminated = False      # Whether the scheduler was asked to terminate or has terminated
-        self.suspended = False       # Whether the scheduler is currently suspended
+        self.active_jobs = []       # Jobs currently being run in the scheduler
+        self.waiting_jobs = []      # Jobs waiting for resources
+        self.queues = None          # Exposed for the frontend to print summaries
+        self.context = context      # The Context object shared with Queues
+        self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
+        self.suspended = False      # Whether the scheduler is currently suspended
 
         # These are shared with the Job, but should probably be removed or made private in some way.
-        self.loop = None             # Shared for Job access to observe the message queue
-        self.internal_stops = 0      # Amount of SIGSTP signals we've introduced, this is shared with job.py
+        self.loop = None            # Shared for Job access to observe the message queue
+        self.internal_stops = 0     # Amount of SIGSTP signals we've introduced, this is shared with job.py
 
         #
         # Private members
@@ -216,15 +215,14 @@ class Scheduler():
     #
     # Args:
     #     jobs ([Job]): A list of jobs to schedule
-    #     priority_jobs([Job]): A list of jobs which should be prioritised over those in jobs
     #
     # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
     # run as soon any other queueing jobs finish, provided sufficient
-    # resources are available for them to run.
+    # resources are available for them to run
     #
-    def schedule_jobs(self, jobs, priority_jobs):
-            self.waiting_jobs.extend(jobs)
-            self.waiting_jobs.extendleft(priority_jobs)
+    def schedule_jobs(self, jobs):
+        for job in jobs:
+            self.waiting_jobs.append(job)
 
     # job_completed():
     #
@@ -300,7 +298,6 @@ class Scheduler():
     #
     def _schedule_queue_jobs(self):
         ready = []
-        ready_priority = []
         process_queues = True
 
         while self._queue_jobs and process_queues:
@@ -325,19 +322,16 @@ class Scheduler():
             # to fetch tasks for elements which failed to pull, and
             # thus need all the pulls to complete before ever starting
             # a build
-            for queue in reversed(self.queues):
-                ready_jobs = queue.pop_ready_jobs()
-                if queue.high_priority:
-                    ready_priority.extend(ready_jobs)
-                else:
-                    ready.extend(ready_jobs)
+            ready.extend(chain.from_iterable(
+                queue.pop_ready_jobs() for queue in reversed(self.queues)
+            ))
 
             # pop_ready_jobs() may have skipped jobs, adding them to
             # the done_queue.  Pull these skipped elements forward to
             # the next queue and process them.
             process_queues = any(q.dequeue_ready() for q in self.queues)
 
-        self.schedule_jobs(ready, ready_priority)
+        self.schedule_jobs(ready)
         self._sched()
 
     # _run_cleanup()