You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:30:01 UTC
[buildstream] 03/32: buildstream/_scheduler/*.py: Make job
submission a queue job
This is an automated email from the ASF dual-hosted git repository.
root pushed a commit to branch testing/local-cache-expiry
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit bc4cc296380f165da8eeb6d335ef6cb5b1f4f106
Author: Tristan Maat <tm...@tlater.net>
AuthorDate: Sun Jun 24 22:14:49 2018 +0000
buildstream/_scheduler/*.py: Make job submission a queue job
---
buildstream/_scheduler/jobs/job.py | 15 ++-
buildstream/_scheduler/queues/buildqueue.py | 3 +
buildstream/_scheduler/queues/fetchqueue.py | 2 +
buildstream/_scheduler/queues/pullqueue.py | 2 +
buildstream/_scheduler/queues/pushqueue.py | 2 +
buildstream/_scheduler/queues/queue.py | 14 +--
buildstream/_scheduler/queues/trackqueue.py | 2 +
buildstream/_scheduler/scheduler.py | 179 ++++++++++++++++++++--------
8 files changed, 154 insertions(+), 65 deletions(-)
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index c567b6f..84be452 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -52,6 +52,14 @@ class Process(multiprocessing.Process):
self._sentinel = self._popen.sentinel
+class JobType():
+ FETCH = 1
+ TRACK = 2
+ BUILD = 3
+ PULL = 4
+ PUSH = 5
+
+
# Job()
#
# The Job object represents a parallel task, when calling Job.spawn(),
@@ -61,6 +69,7 @@ class Process(multiprocessing.Process):
#
# Args:
# scheduler (Scheduler): The scheduler
+# job_type (QueueType): The type of the job
# action_name (str): The queue action name
# logfile (str): A template string that points to the logfile
# that should be used - should contain {pid}.
@@ -68,13 +77,14 @@ class Process(multiprocessing.Process):
#
class Job():
- def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
+ def __init__(self, scheduler, job_type, action_name, logfile, *, max_retries=0):
#
# Public members
#
self.action_name = action_name # The action name for the Queue
- self.child_data = None
+ self.child_data = None # Data to be sent to the main process
+ self.job_type = job_type # The type of the job
#
# Private members
@@ -541,6 +551,7 @@ class Job():
return
self._parent_complete(returncode == 0, self._result)
+ self._scheduler.job_completed(self)
# _parent_process_envelope()
#
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 50ba312..0c75538 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -18,7 +18,9 @@
# Tristan Van Berkom <tr...@codethink.co.uk>
# Jürg Billeter <ju...@codethink.co.uk>
+import os
from . import Queue, QueueStatus, QueueType
+from ..jobs import JobType
# A queue which assembles elements
@@ -28,6 +30,7 @@ class BuildQueue(Queue):
action_name = "Build"
complete_name = "Built"
queue_type = QueueType.BUILD
+ job_type = JobType.BUILD
def process(self, element):
element._assemble()
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index bdff156..2438a9a 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -23,6 +23,7 @@ from ... import Consistency
# Local imports
from . import Queue, QueueStatus, QueueType
+from ..jobs import JobType
# A queue which fetches element sources
@@ -32,6 +33,7 @@ class FetchQueue(Queue):
action_name = "Fetch"
complete_name = "Fetched"
queue_type = QueueType.FETCH
+ job_type = JobType.FETCH
def __init__(self, scheduler, skip_cached=False):
super().__init__(scheduler)
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index b4f5b0d..1fc4364 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -20,6 +20,7 @@
# Local imports
from . import Queue, QueueStatus, QueueType
+from ..jobs import JobType
# A queue which pulls element artifacts
@@ -29,6 +30,7 @@ class PullQueue(Queue):
action_name = "Pull"
complete_name = "Pulled"
queue_type = QueueType.FETCH
+ job_type = JobType.PULL
def process(self, element):
# returns whether an artifact was downloaded or not
diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
index 624eefd..aa5540e 100644
--- a/buildstream/_scheduler/queues/pushqueue.py
+++ b/buildstream/_scheduler/queues/pushqueue.py
@@ -20,6 +20,7 @@
# Local imports
from . import Queue, QueueStatus, QueueType
+from ..jobs import JobType
# A queue which pushes element artifacts
@@ -29,6 +30,7 @@ class PushQueue(Queue):
action_name = "Push"
complete_name = "Pushed"
queue_type = QueueType.PUSH
+ job_type = JobType.PUSH
def process(self, element):
# returns whether an artifact was uploaded or not
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 7f115b4..39e431b 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -71,6 +71,7 @@ class Queue():
action_name = None
complete_name = None
queue_type = None
+ job_type = None
def __init__(self, scheduler):
@@ -241,16 +242,14 @@ class Queue():
logfile = self._element_log_path(element)
self.prepare(element)
- job = ElementJob(scheduler, self.action_name,
- logfile, element=element,
+ job = ElementJob(scheduler, self.job_type,
+ self.action_name, logfile,
+ element=element, queue=self,
action_cb=self.process,
complete_cb=self._job_done,
max_retries=self._max_retries)
scheduler.job_starting(job, element)
- job.spawn()
- self.active_jobs.append(job)
-
# These were not ready but were in the beginning, give em
# first priority again next time around
self._wait_queue.extendleft(unready)
@@ -350,11 +349,6 @@ class Queue():
# immediately before invoking another round of scheduling
self._scheduler.put_job_token(self.queue_type)
- # Notify frontend
- self._scheduler.job_completed(self, job, element, success)
-
- self._scheduler.sched()
-
# Convenience wrapper for Queue implementations to send
# a message for the element they are processing
def _message(self, element, message_type, brief, **kwargs):
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index 3a65f01..a371e52 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -24,6 +24,7 @@ from ... import SourceError
# Local imports
from . import Queue, QueueStatus, QueueType
+from ..jobs import JobType
# A queue which tracks sources
@@ -33,6 +34,7 @@ class TrackQueue(Queue):
action_name = "Track"
complete_name = "Tracked"
queue_type = QueueType.FETCH
+ job_type = JobType.TRACK
def process(self, element):
return element._track()
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index a7a3f95..ffbd656 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -21,11 +21,13 @@
# System imports
import os
import asyncio
+from itertools import chain
import signal
import datetime
from contextlib import contextmanager
# Local imports
+from .jobs import JobType
from .queues import QueueType
@@ -36,6 +38,59 @@ class SchedStatus():
TERMINATED = 1
+# A set of rules that dictates in which order jobs should run.
+#
+# The first tuple defines jobs that are not allowed to be executed
+# before the current job completes (even if the job is still waiting
+# to be executed).
+#
+# The second tuple defines jobs that the current job is not allowed to
+# be run in parallel with.
+#
+# Note that this is very different from the element job
+# dependencies. Both a build and fetch job can be ready at the same
+# time, this has nothing to do with the requirement to fetch sources
+# before building. These rules are purely in place to maintain cache
+# consistency.
+#
+JOB_RULES = {
+ JobType.CLEAN: {
+ # Build and pull jobs are not allowed to run when we are about
+ # to start a cleanup job, because they will add more data to
+ # the artifact cache.
+ 'priority': (JobType.BUILD, JobType.PULL),
+ # Cleanup jobs are not allowed to run in parallel with any
+ # jobs that might need to access the artifact cache, because
+ # we cannot guarantee atomicity otherwise.
+ 'exclusive': (JobType.BUILD, JobType.PULL, JobType.PUSH)
+ },
+ JobType.BUILD: {
+ 'priority': (),
+ 'exclusive': ()
+ },
+ JobType.FETCH: {
+ 'priority': (),
+ 'exclusive': ()
+ },
+ JobType.PULL: {
+ 'priority': (),
+ 'exclusive': ()
+ },
+ JobType.PUSH: {
+ 'priority': (),
+ 'exclusive': ()
+ },
+ JobType.SIZE: {
+ 'priority': (),
+ 'exclusive': ()
+ },
+ JobType.TRACK: {
+ 'priority': (),
+ 'exclusive': ()
+ }
+}
+
+
# Scheduler()
#
# The scheduler operates on a list queues, each of which is meant to accomplish
@@ -69,6 +124,8 @@ class Scheduler():
#
# Public members
#
+ self.waiting_jobs = [] # Jobs waiting for execution
+ self.active_jobs = [] # Jobs currently being run in the scheduler
self.queues = None # Exposed for the frontend to print summaries
self.context = context # The Context object shared with Queues
self.terminated = False # Whether the scheduler was asked to terminate or has terminated
@@ -129,7 +186,7 @@ class Scheduler():
self._connect_signals()
# Run the queues
- self.sched()
+ self.schedule_queue_jobs()
self.loop.run_forever()
self.loop.close()
@@ -220,11 +277,38 @@ class Scheduler():
# and process anything that is ready.
#
def sched(self):
+ for job in self.waiting_jobs:
+ # If our job is not allowed to run with any job currently
+ # running, we don't start it.
+ if any(running_job.job_type in JOB_RULES[job.job_type]['exclusive']
+ for running_job in self.active_jobs):
+ continue
+
+ # If any job currently waiting has priority over this one,
+ # we don't start it.
+ if any(job.job_type in JOB_RULES[waiting_job.job_type]['priority']
+ for waiting_job in self.waiting_jobs):
+ continue
+
+ job.spawn()
+ self.waiting_jobs.remove(job)
+ self.active_jobs.append(job)
+
+ if self._job_start_callback:
+ self._job_start_callback(job)
+
+ # If nothings ticking, time to bail out
+ if not self.active_jobs and not self.waiting_jobs:
+ self.loop.stop()
+
+ def schedule_jobs(self, jobs):
+ self.waiting_jobs.extend(jobs)
+ def schedule_queue_jobs(self):
+ ready = []
process_queues = True
while self._queue_jobs and process_queues:
-
# Pull elements forward through queues
elements = []
for queue in self.queues:
@@ -233,31 +317,42 @@ class Scheduler():
# Dequeue processed elements for the next queue
elements = list(queue.dequeue())
- elements = list(elements)
# Kickoff whatever processes can be processed at this time
#
- # We start by queuing from the last queue first, because we want to
- # give priority to queues later in the scheduling process in the case
- # that multiple queues share the same token type.
+ # We start by queuing from the last queue first, because
+ # we want to give priority to queues later in the
+ # scheduling process in the case that multiple queues
+ # share the same token type.
#
- # This avoids starvation situations where we dont move on to fetch
- # tasks for elements which failed to pull, and thus need all the pulls
- # to complete before ever starting a build
- for queue in reversed(self.queues):
- queue.process_ready()
-
- # process_ready() may have skipped jobs, adding them to the done_queue.
- # Pull these skipped elements forward to the next queue and process them.
+ # This avoids starvation situations where we dont move on
+ # to fetch tasks for elements which failed to pull, and
+ # thus need all the pulls to complete before ever starting
+ # a build
+ ready.extend(chain.from_iterable(
+ queue.process_ready() for queue in reversed(self.queues)
+ ))
+
+ # process_ready() may have skipped jobs, adding them to
+ # the done_queue. Pull these skipped elements forward to
+ # the next queue and process them.
process_queues = any(q.dequeue_ready() for q in self.queues)
- # If nothings ticking, time to bail out
- ticking = 0
- for queue in self.queues:
- ticking += len(queue.active_jobs)
+ self.schedule_jobs(ready)
+ self.sched()
- if ticking == 0:
- self.loop.stop()
+ # job_completed():
+ #
+ # Called when a Job completes
+ #
+ # Args:
+ # queue (Queue): The Queue holding a complete job
+ # job (Job): The completed Job
+ # success (bool): Whether the Job completed with a success status
+ #
+ def job_completed(self, job):
+ self.active_jobs.remove(job)
+ self.schedule_queue_jobs()
# get_job_token():
#
@@ -290,30 +385,6 @@ class Scheduler():
def put_job_token(self, queue_type):
self._job_tokens[queue_type] += 1
- # job_starting():
- #
- # Called by the Queue when starting a Job
- #
- # Args:
- # job (Job): The starting Job
- #
- def job_starting(self, job, element):
- if self._job_start_callback:
- self._job_start_callback(element, job.action_name)
-
- # job_completed():
- #
- # Called by the Queue when a Job completes
- #
- # Args:
- # queue (Queue): The Queue holding a complete job
- # job (Job): The completed Job
- # success (bool): Whether the Job completed with a success status
- #
- def job_completed(self, queue, job, element, success):
- if self._job_complete_callback:
- self._job_complete_callback(element, queue, job.action_name, success)
-
#######################################################
# Local Private Methods #
#######################################################
@@ -400,18 +471,20 @@ class Scheduler():
wait_start = datetime.datetime.now()
wait_limit = 20.0
- # First tell all jobs to terminate
+ active_jobs = self.active_jobs
for queue in self.queues:
- for job in queue.active_jobs:
- job.terminate()
+ active_jobs.extend(queue.active_jobs)
+
+ # First tell all jobs to terminate
+ for job in active_jobs:
+ job.terminate()
# Now wait for them to really terminate
- for queue in self.queues:
- for job in queue.active_jobs:
- elapsed = datetime.datetime.now() - wait_start
- timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
- if not job.terminate_wait(timeout):
- job.kill()
+ for job in active_jobs:
+ elapsed = datetime.datetime.now() - wait_start
+ timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
+ if not job.terminate_wait(timeout):
+ job.kill()
self.loop.stop()