You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:30:28 UTC
[buildstream] 30/32: Add resource manager
This is an automated email from the ASF dual-hosted git repository.
root pushed a commit to branch testing/local-cache-expiry
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit e1dec47158a93aed3235bcbe9d33a70b1aee7b11
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Mon Jul 16 10:50:40 2018 +0100
Add resource manager
---
buildstream/_scheduler/resources.py | 105 ++++++++++++++++++++++++++++++++++++
1 file changed, 105 insertions(+)
diff --git a/buildstream/_scheduler/resources.py b/buildstream/_scheduler/resources.py
new file mode 100644
index 0000000..bbf851b
--- /dev/null
+++ b/buildstream/_scheduler/resources.py
@@ -0,0 +1,105 @@
+class ResourceType():
+ CACHE = 0
+ DOWNLOAD = 1
+ PROCESS = 2
+ UPLOAD = 3
+
+
+class Resources():
+ def __init__(self, num_builders, num_fetchers, num_pushers):
+ self._max_resources = {
+ ResourceType.CACHE: 1,
+ ResourceType.DOWNLOAD: num_fetchers,
+ ResourceType.PROCESS: num_builders,
+ ResourceType.UPLOAD: num_pushers
+ }
+
+ # Resources jobs are currently using.
+ self._used_resources = {
+ ResourceType.CACHE: 0,
+ ResourceType.DOWNLOAD: 0,
+ ResourceType.PROCESS: 0,
+ ResourceType.UPLOAD: 0
+ }
+
+ # Resources jobs currently want exclusive access to. The set
+ # of jobs that have asked for exclusive access is the value -
+ # this is so that we can avoid scheduling any other jobs until
+ # *all* exclusive jobs that "register interest" have finished
+ # - which avoids starving them of scheduling time.
+ self._exclusive_resources = {
+ ResourceType.CACHE: set(),
+ ResourceType.DOWNLOAD: set(),
+ ResourceType.PROCESS: set(),
+ ResourceType.UPLOAD: set()
+ }
+
+ def clear_job_resources(self, job):
+ for resource in job.exclusive_resources:
+ self._exclusive_resources[resource].remove(hash(job))
+
+ for resource in job.resources:
+ self._used_resources[resource] -= 1
+
+ def reserve_exclusive_resources(self, job):
+ exclusive = job.exclusive_resources
+
+ # The very first thing we do is to register any exclusive
+ # resources this job may want. Even if the job is not yet
+ # allowed to run (because another job is holding the resource
+ # it wants), we can still set this - it just means that any
+ # job *currently* using these resources has to finish first,
+ # and no new jobs wanting these can be launched (except other
+ # exclusive-access jobs).
+ #
+ for resource in exclusive:
+ self._exclusive_resources[resource].add(hash(job))
+
+ def reserve_job_resources(self, job):
+ # First, we check if the job wants to access a resource that
+ # another job wants exclusive access to. If so, it cannot be
+ # scheduled.
+ #
+ # Note that if *both* jobs want this exclusively, we don't
+ # fail yet.
+ #
+ # FIXME: I *think* we can deadlock if two jobs want disjoint
+ # sets of exclusive and non-exclusive resources. This
+ # is currently not possible, but may be worth thinking
+ # about.
+ #
+ for resource in job.resources - job.exclusive_resources:
+ # If our job wants this resource exclusively, we never
+ # check this, so we can get away with not (temporarily)
+ # removing it from the set.
+ if self._exclusive_resources[resource]:
+ return False
+
+ # Now we check if anything is currently using any resources
+ # this job wants exclusively. If so, the job cannot be
+ # scheduled.
+ #
+ # Since jobs that use a resource exclusively are also using
+ # it, this means only one exclusive job can ever be scheduled
+ # at a time, despite being allowed to be part of the exclusive
+ # set.
+ #
+ for exclusive in job.exclusive_resources:
+ if self._used_resources[exclusive] != 0:
+ return False
+
+ # Finally, we check if we have enough of each resource
+ # available. If we don't have enough, the job cannot be
+ # scheduled.
+ for resource in job.resources:
+ if (self._max_resources[resource] > 0 and
+ self._used_resources[resource] >= self._max_resources[resource]):
+ return False
+
+ # Now we register the fact that our job is using the resources
+ # it asked for, and tell the scheduler that it is allowed to
+ # continue.
+ for resource in job.resources:
+ self._used_resources[resource] += 1
+
+ return True