You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:48:25 UTC

[buildstream] 09/11: Clean the artifact cache when we hit the cache quota

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch 135-expire-artifacts-in-local-cache
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 15de94cf14bd1e81ead39a12965e87ec11a953e6
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Jul 17 13:05:51 2018 +0100

    Clean the artifact cache when we hit the cache quota
    
    When the cache quota is hit, we will remove any elements not required
    for the current build, until our cache is only filled halfway.
---
 buildstream/_artifactcache/artifactcache.py | 110 +++++++++++++++++++++++++++-
 buildstream/_artifactcache/cascache.py      |   6 ++
 buildstream/_scheduler/jobs/__init__.py     |   1 +
 buildstream/_scheduler/jobs/cleanupjob.py   |  72 ++++++++++++++++++
 buildstream/_scheduler/scheduler.py         |  16 +++-
 buildstream/_stream.py                      |   8 ++
 6 files changed, 210 insertions(+), 3 deletions(-)

diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 9abe68c..5feae93 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -21,7 +21,8 @@ import os
 import string
 from collections import Mapping, namedtuple
 
-from .._exceptions import ImplError, LoadError, LoadErrorReason
+from ..element import _KeyStrength
+from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
 from .._message import Message, MessageType
 from .. import utils
 from .. import _yaml
@@ -77,6 +78,7 @@ ArtifactCacheSpec.__new__.__defaults__ = (None, None, None)
 class ArtifactCache():
     def __init__(self, context):
         self.context = context
+        self.required_artifacts = set()
         self.extractdir = os.path.join(context.artifactdir, 'extract')
         self.max_size = context.cache_quota
         self.estimated_size = None
@@ -183,6 +185,75 @@ class ArtifactCache():
                                   (str(provenance)))
         return cache_specs
 
+    # append_required_artifacts():
+    #
+    # Append to the list of elements whose artifacts are required for
+    # the current run. Artifacts whose elements are in this list will
+    # be locked by the artifact cache and not touched for the duration
+    # of the current pipeline.
+    #
+    # Args:
+    #     elements (iterable): A set of elements to mark as required
+    #
+    def append_required_artifacts(self, elements):
+        # We lock both strong and weak keys - deleting one but not the
+        # other won't save space in most cases anyway, but would be a
+        # user inconvenience.
+
+        for element in elements:
+            strong_key = element._get_cache_key(strength=_KeyStrength.STRONG)
+            weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
+
+            for key in (strong_key, weak_key):
+                if key and key not in self.required_artifacts:
+                    self.required_artifacts.add(key)
+
+                    # We also update the usage times of any artifacts
+                    # we will be using, which helps preventing a
+                    # buildstream process that runs in parallel with
+                    # this one from removing artifacts in-use.
+                    try:
+                        self.update_atime(key)
+                    except ArtifactError:
+                        pass
+
+    # clean():
+    #
+    # Clean the artifact cache as much as possible.
+    #
+    def clean(self):
+        artifacts = self.list_artifacts()
+
+        while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold:
+            try:
+                to_remove = artifacts.pop(0)
+            except IndexError:
+                # If too many artifacts are required, and we therefore
+                # can't remove them, we have to abort the build.
+                #
+                # FIXME: Asking the user what to do may be neater
+                default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
+                                            'buildstream.conf')
+                detail = ("There is not enough space to build the given element.\n"
+                          "Please increase the cache-quota in {}."
+                          .format(self.context.config_origin or default_conf))
+
+                if self.calculate_cache_size() > self.context.cache_quota:
+                    raise ArtifactError("Cache too full. Aborting.",
+                                        detail=detail,
+                                        reason="cache-too-full")
+                else:
+                    break
+
+            key = to_remove.rpartition('/')[2]
+            if key not in self.required_artifacts:
+                size = self.remove(to_remove)
+                if size:
+                    self.cache_size -= size
+
+        # This should be O(1) if implemented correctly
+        return self.calculate_cache_size()
+
     # get_approximate_cache_size()
     #
     # A cheap method that aims to serve as an upper limit on the
@@ -216,6 +287,17 @@ class ArtifactCache():
     # Abstract methods for subclasses to implement #
     ################################################
 
+    # update_atime()
+    #
+    # Update the atime of an artifact.
+    #
+    # Args:
+    #     key (str): The key of the artifact.
+    #
+    def update_atime(self, key):
+        raise ImplError("Cache '{kind}' does not implement contains()"
+                        .format(kind=type(self).__name__))
+
     # initialize_remotes():
     #
     # This will contact each remote cache.
@@ -241,6 +323,32 @@ class ArtifactCache():
         raise ImplError("Cache '{kind}' does not implement contains()"
                         .format(kind=type(self).__name__))
 
+    # list_artifacts():
+    #
+    # List artifacts in this cache in LRU order.
+    #
+    # Returns:
+    #     ([str]) - A list of artifact names as generated by
+    #               `ArtifactCache.get_artifact_fullname` in LRU order
+    #
+    def list_artifacts(self):
+        raise ImplError("Cache '{kind}' does not implement list_artifacts()"
+                        .format(kind=type(self).__name__))
+
+    # remove():
+    #
+    # Removes the artifact for the specified ref from the local
+    # artifact cache.
+    #
+    # Args:
+    #     ref (artifact_name): The name of the artifact to remove (as
+    #                          generated by
+    #                          `ArtifactCache.get_artifact_fullname`)
+    #
+    def remove(self, artifact_name):
+        raise ImplError("Cache '{kind}' does not implement remove()"
+                        .format(kind=type(self).__name__))
+
     # extract():
     #
     # Extract cached artifact for the specified Element if it hasn't
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index 4f1d8ac..1e84c28 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -450,6 +450,12 @@ class CASCache(ArtifactCache):
         except FileNotFoundError as e:
             raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
 
+    def update_atime(self, ref):
+        try:
+            os.utime(self._refpath(ref))
+        except FileNotFoundError as e:
+            raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
+
     def calculate_cache_size(self):
         if self.cache_size is None:
             self.cache_size = utils._get_dir_size(self.casdir)
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
index 9815586..185d825 100644
--- a/buildstream/_scheduler/jobs/__init__.py
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -1,2 +1,3 @@
 from .elementjob import ElementJob
 from .cachesizejob import CacheSizeJob
+from .cleanupjob import CleanupJob
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
new file mode 100644
index 0000000..3a91e91
--- /dev/null
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -0,0 +1,72 @@
+#  Copyright (C) 2018 Codethink Limited
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Author:
+#        Tristan Daniƫl Maat <tr...@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from .job import Job
+from ..._platform import Platform
+from ..._message import Message
+
+
+class CleanupJob(Job):
+    def __init__(self, *args, complete_cb, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._complete_cb = complete_cb
+        self._cache = Platform._instance.artifactcache
+
+    def _child_process(self):
+        return self._cache.clean()
+
+    def _parent_complete(self, success, result):
+        self._cache._set_cache_size(result)
+        if self._complete_cb:
+            self._complete_cb()
+
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        self._logfile = logfile.format(pid=os.getpid())
+        yield self._logfile
+        self._logfile = None
+
+    # message():
+    #
+    # Sends a message to the frontend
+    #
+    # Args:
+    #    message_type (MessageType): The type of message to send
+    #    message (str): The message
+    #    kwargs: Remaining Message() constructor arguments
+    #
+    def message(self, message_type, message, **kwargs):
+        args = dict(kwargs)
+        args['scheduler'] = True
+        self._scheduler.context.message(Message(None, message_type, message, **args))
+
+    def _child_log(self, message):
+        message.action_name = self.action_name
+
+        with open(self._logfile, 'a+') as log:
+            message_text = self.decorate_message(message, '[cleanup]')
+            log.write('{}\n'.format(message_text))
+            log.flush()
+
+        return message
+
+    def _child_process_data(self):
+        return {}
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index a11134c..aeb3293 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -28,7 +28,7 @@ from contextlib import contextmanager
 
 # Local imports
 from .resources import Resources, ResourceType
-from .jobs import CacheSizeJob
+from .jobs import CacheSizeJob, CleanupJob
 
 
 # A decent return code for Scheduler.run()
@@ -313,13 +313,25 @@ class Scheduler():
         self.schedule_jobs(ready)
         self._sched()
 
+    def _run_cleanup(self, cache_size):
+        if cache_size and cache_size < self.context.cache_quota:
+            return
+
+        logpath = os.path.join(self.context.logdir, 'cleanup.{pid}.log')
+        job = CleanupJob(self, 'cleanup', logpath,
+                         resources=[ResourceType.CACHE,
+                                    ResourceType.PROCESS],
+                         exclusive_resources=[ResourceType.CACHE],
+                         complete_cb=None)
+        self.schedule_jobs([job])
+
     def _check_cache_size_real(self):
         logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log')
         job = CacheSizeJob(self, 'cache_size', logpath,
                            resources=[ResourceType.CACHE,
                                       ResourceType.PROCESS],
                            exclusive_resources=[ResourceType.CACHE],
-                           complete_cb=None)
+                           complete_cb=self._run_cleanup)
         self.schedule_jobs([job])
 
     # _suspend_jobs()
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index a29d8b6..34baa0b 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -876,6 +876,14 @@ class Stream():
                                                   selected,
                                                   except_elements)
 
+        # Set the "required" artifacts that should not be removed
+        # while this pipeline is active
+        #
+        # FIXME: The set of required artifacts should probably be
+        #        what's in `selected`, but this does not seem to work
+        #        for some reason
+        self._artifacts.append_required_artifacts((e for e in self._pipeline.dependencies(elements, Scope.ALL)))
+
         if selection == PipelineSelection.PLAN and dynamic_plan:
             # We use a dynamic build plan, only request artifacts of top-level targets,
             # others are requested dynamically as needed.