You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 08:12:55 UTC

[buildstream] 03/04: _scheduler/scheduler.py: Only run one cache size job at a time

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tristan/one-cache-size-job-2
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 8fd2cd9ee4e9ef78dde9bdc2b3a13db64076b6cd
Author: Tristan Van Berkom <tr...@codethink.co.uk>
AuthorDate: Sun Jan 6 14:15:46 2019 -0500

    _scheduler/scheduler.py: Only run one cache size job at a time
    
    When queuing the special cache management related cleanup and
    cache size jobs, we now treat these jobs as special and do the
    following:
    
      * Avoid queueing a cleanup/cache_size job if one is already queued
    
        We just drop redundantly queued jobs here.
    
      * Ensure that jobs of this type only run one at a time
    
        This could have been done with the Resources mechanics,
        however as these special jobs have the same properties and
        are basically owned by the Scheduler, it seemed more straight
        forward to handle the behaviors of these special jobs together.
    
    This fixes issue #753
---
 buildstream/_scheduler/scheduler.py | 53 +++++++++++++++++++++++++++++++------
 1 file changed, 45 insertions(+), 8 deletions(-)

diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 8facb08..ecbfef3 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -38,6 +38,16 @@ class SchedStatus():
     TERMINATED = 1
 
 
+# Our _REDUNDANT_EXCLUSIVE_ACTIONS jobs are special ones
+# which we launch dynamically, they have the property of being
+# meaningless to queue if one is already queued, and it also
+# doesnt make sense to run them in parallel
+#
+_ACTION_NAME_CLEANUP = 'cleanup'
+_ACTION_NAME_CACHE_SIZE = 'cache_size'
+_REDUNDANT_EXCLUSIVE_ACTIONS = [_ACTION_NAME_CLEANUP, _ACTION_NAME_CACHE_SIZE]
+
+
 # Scheduler()
 #
 # The scheduler operates on a list queues, each of which is meant to accomplish
@@ -94,6 +104,15 @@ class Scheduler():
         self._suspendtime = None
         self._queue_jobs = True      # Whether we should continue to queue jobs
 
+        # Whether our exclusive jobs, like 'cleanup' are currently already
+        # waiting or active.
+        #
+        # This is just a bit quicker than scanning the wait queue and active
+        # queue and comparing job action names.
+        #
+        self._exclusive_waiting = set()
+        self._exclusive_active = set()
+
         self._resources = Resources(context.sched_builders,
                                     context.sched_fetchers,
                                     context.sched_pushers)
@@ -223,6 +242,8 @@ class Scheduler():
     def job_completed(self, job, success):
         self._resources.clear_job_resources(job)
         self.active_jobs.remove(job)
+        if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
+            self._exclusive_active.remove(job.action_name)
         self._job_complete_callback(job, success)
         self._schedule_queue_jobs()
         self._sched()
@@ -233,14 +254,9 @@ class Scheduler():
     # size is calculated, a cleanup job will be run automatically
     # if needed.
     #
-    # FIXME: This should ensure that only one cache size job
-    #        is ever pending at a given time. If a cache size
-    #        job is already running, it is correct to queue
-    #        a new one, it is incorrect to have more than one
-    #        of these jobs pending at a given time, though.
-    #
     def check_cache_size(self):
-        job = CacheSizeJob(self, 'cache_size', 'cache_size/cache_size',
+        job = CacheSizeJob(self, _ACTION_NAME_CACHE_SIZE,
+                           'cache_size/cache_size',
                            resources=[ResourceType.CACHE,
                                       ResourceType.PROCESS],
                            complete_cb=self._run_cleanup)
@@ -263,10 +279,19 @@ class Scheduler():
             if not self._resources.reserve_job_resources(job):
                 continue
 
+            # Postpone these jobs if one is already running
+            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS and \
+               job.action_name in self._exclusive_active:
+                continue
+
             job.spawn()
             self.waiting_jobs.remove(job)
             self.active_jobs.append(job)
 
+            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
+                self._exclusive_waiting.remove(job.action_name)
+                self._exclusive_active.add(job.action_name)
+
             if self._job_start_callback:
                 self._job_start_callback(job)
 
@@ -287,6 +312,18 @@ class Scheduler():
     #
     def _schedule_jobs(self, jobs):
         for job in jobs:
+
+            # Special treatment of our redundant exclusive jobs
+            #
+            if job.action_name in _REDUNDANT_EXCLUSIVE_ACTIONS:
+
+                # Drop the job if one is already queued
+                if job.action_name in self._exclusive_waiting:
+                    continue
+
+                # Mark this action type as queued
+                self._exclusive_waiting.add(job.action_name)
+
             self.waiting_jobs.append(job)
 
     # _schedule_queue_jobs()
@@ -355,7 +392,7 @@ class Scheduler():
         if not artifacts.has_quota_exceeded():
             return
 
-        job = CleanupJob(self, 'cleanup', 'cleanup/cleanup',
+        job = CleanupJob(self, _ACTION_NAME_CLEANUP, 'cleanup/cleanup',
                          resources=[ResourceType.CACHE,
                                     ResourceType.PROCESS],
                          exclusive_resources=[ResourceType.CACHE])