You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:30:11 UTC

[buildstream] 13/32: Automatically delete artifacts when we run out of space

This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch testing/local-cache-expiry
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 9e9dad60a302c2d51d736af8621d18c93416222e
Author: Tristan Maat <tm...@tlater.net>
AuthorDate: Sat Mar 24 17:56:56 2018 +0000

    Automatically delete artifacts when we run out of space
---
 buildstream/_artifactcache/artifactcache.py | 47 ++++++++++++++++++-
 buildstream/_artifactcache/ostreecache.py   | 18 +++++++-
 buildstream/_artifactcache/tarcache.py      | 31 +++++++++++++
 buildstream/_exceptions.py                  |  4 +-
 buildstream/_ostree.py                      |  2 +
 buildstream/_scheduler/jobs/__init__.py     |  1 +
 buildstream/_scheduler/jobs/cleanupjob.py   | 71 +++++++++++++++++++++++++++++
 buildstream/_scheduler/jobs/job.py          |  1 +
 buildstream/_scheduler/queues/buildqueue.py |  2 +-
 buildstream/_scheduler/scheduler.py         |  1 +
 buildstream/element.py                      |  2 +-
 buildstream/utils.py                        | 14 +++---
 12 files changed, 181 insertions(+), 13 deletions(-)

diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 3b2aaf5..a7bf679 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -21,7 +21,7 @@ import os
 import string
 from collections import Mapping, namedtuple
 
-from .._exceptions import ImplError, LoadError, LoadErrorReason
+from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
 from .._message import Message, MessageType
 from .. import utils
 from .. import _yaml
@@ -165,6 +165,39 @@ class ArtifactCache():
                                   (str(provenance)))
         return cache_specs
 
+    # clean():
+    #
+    # Clean the artifact cache as much as possible.
+    #
+    def clean(self):
+        artifacts = self.list_artifacts()
+
+        while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold:
+            try:
+                to_remove = artifacts.pop(0)
+            except IndexError:
+                # If too many artifacts are required, and we therefore
+                # can't remove them, we have to abort the build.
+                #
+                # FIXME: Asking the user what to do may be neater
+                default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
+                                            'buildstream.conf')
+                detail = ("There is not enough space to build the given element.\n"
+                          "Please increase the cache-quota in {}."
+                          .format(self.context.config_origin or default_conf))
+
+                if self.calculate_cache_size() > self.context.cache_quota:
+                    raise ArtifactError("Cache too full. Aborting.",
+                                        detail=detail,
+                                        reason="cache-too-full")
+                else:
+                    break
+
+            self.remove(to_remove)
+
+        # This should be O(1) if implemented correctly
+        return self.calculate_cache_size()
+
     # get_approximate_cache_size()
     #
     # A cheap method that aims to serve as an upper limit on the
@@ -223,6 +256,18 @@ class ArtifactCache():
         raise ImplError("Cache '{kind}' does not implement contains()"
                         .format(kind=type(self).__name__))
 
+    # list_artifacts():
+    #
+    # List artifacts in this cache in LRU order.
+    #
+    # Returns:
+    #     ([str]) - A list of artifact names as generated by
+    #               `ArtifactCache.get_artifact_fullname` in LRU order
+    #
+    def list_artifacts(self):
+        raise ImplError("Cache '{kind}' does not implement list_artifacts()"
+                        .format(kind=type(self).__name__))
+
     # remove():
     #
     # Removes the artifact for the specified ref from the local
diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py
index 8d22917..e0bbc3c 100755
--- a/buildstream/_artifactcache/ostreecache.py
+++ b/buildstream/_artifactcache/ostreecache.py
@@ -93,8 +93,15 @@ class OSTreeCache(ArtifactCache):
         ref = self.get_artifact_fullname(element, key)
         return _ostree.exists(self.repo, ref)
 
-    def remove(self, ref):
-        return _ostree.remove(self.repo, ref)
+    def list_artifacts(self):
+        return _ostree.list_artifacts(self.repo)
+
+    def remove(self, artifact_name):
+        # We cannot defer pruning, unfortunately, because we could
+        # otherwise not figure out how much space was freed by the
+        # removal, and would therefore not be able to expire the
+        # correct number of artifacts.
+        self.cache_size -= _ostree.remove(self.repo, artifact_name, defer_prune=False)
 
     def extract(self, element, key):
         ref = self.get_artifact_fullname(element, key)
@@ -105,6 +112,9 @@ class OSTreeCache(ArtifactCache):
         # Extracting a nonexistent artifact is a bug
         assert rev, "Artifact missing for {}".format(ref)
 
+        ref_file = os.path.join(self.repo.get_path().get_path(), 'refs', 'heads', ref)
+        os.utime(ref_file)
+
         dest = os.path.join(self.extractdir, element._get_project().name, element.normal_name, rev)
         if os.path.isdir(dest):
             # artifact has already been extracted
@@ -140,6 +150,10 @@ class OSTreeCache(ArtifactCache):
         except OSTreeError as e:
             raise ArtifactError("Failed to commit artifact: {}".format(e)) from e
 
+        for ref in refs:
+            ref_file = os.path.join(self.repo.get_path().get_path(), 'refs', 'heads', ref)
+            os.utime(ref_file)
+
         self.cache_size = None
 
     def can_diff(self):
diff --git a/buildstream/_artifactcache/tarcache.py b/buildstream/_artifactcache/tarcache.py
index 3cae9d9..ad41e42 100644
--- a/buildstream/_artifactcache/tarcache.py
+++ b/buildstream/_artifactcache/tarcache.py
@@ -45,6 +45,24 @@ class TarCache(ArtifactCache):
         path = os.path.join(self.tardir, _tarpath(element, key))
         return os.path.isfile(path)
 
+    # list_artifacts():
+    #
+    # List artifacts in this cache in LRU order.
+    #
+    # Returns:
+    #     (list) - A list of refs in LRU order
+    #
+    def list_artifacts(self):
+        artifacts = list(utils.list_relative_paths(self.tardir, list_dirs=False))
+        mtimes = [os.path.getmtime(os.path.join(self.tardir, artifact))
+                  for artifact in artifacts if artifact]
+
+        # We need to get rid of the tarfile extension to get a proper
+        # ref - os.splitext doesn't do this properly, unfortunately.
+        artifacts = [artifact[:-len('.tar.bz2')] for artifact in artifacts]
+
+        return [name for _, name in sorted(zip(mtimes, artifacts))]
+
     # remove()
     #
     # Implements artifactcache.remove().
@@ -68,6 +86,19 @@ class TarCache(ArtifactCache):
                 _Tar.archive(os.path.join(self.tardir, ref), key, temp)
 
             self.cache_size = None
+            os.utime(os.path.join(self.tardir, ref))
+
+    # update_atime():
+    #
+    # Update the access time of an element.
+    #
+    # Args:
+    #     element (Element): The Element to mark
+    #     key (str): The cache key to use
+    #
+    def update_atime(self, element, key):
+        path = _tarpath(element, key)
+        os.utime(os.path.join(self.tardir, path))
 
     def extract(self, element, key):
 
diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py
index 3aadd52..96d634b 100644
--- a/buildstream/_exceptions.py
+++ b/buildstream/_exceptions.py
@@ -246,8 +246,8 @@ class SandboxError(BstError):
 # Raised when errors are encountered in the artifact caches
 #
 class ArtifactError(BstError):
-    def __init__(self, message, reason=None):
-        super().__init__(message, domain=ErrorDomain.ARTIFACT, reason=reason)
+    def __init__(self, message, *, detail=None, reason=None):
+        super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason)
 
 
 # PipelineError
diff --git a/buildstream/_ostree.py b/buildstream/_ostree.py
index 238c6b4..d060960 100644
--- a/buildstream/_ostree.py
+++ b/buildstream/_ostree.py
@@ -565,6 +565,8 @@ def list_artifacts(repo):
     ref_heads = os.path.join(repo.get_path().get_path(), 'refs', 'heads')
 
     # obtain list of <project>/<element>/<key>
+    # FIXME: ostree 2017.11+ supports a flag that would allow
+    #        listing only local refs
     refs = _list_all_refs(repo).keys()
 
     mtimes = []
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
index 9815586..185d825 100644
--- a/buildstream/_scheduler/jobs/__init__.py
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -1,2 +1,3 @@
 from .elementjob import ElementJob
 from .cachesizejob import CacheSizeJob
+from .cleanupjob import CleanupJob
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
new file mode 100644
index 0000000..3ae635a
--- /dev/null
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -0,0 +1,71 @@
+#  Copyright (C) 2018 Codethink Limited
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Author:
+#        Tristan Daniƫl Maat <tr...@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from .job import Job
+from ..._platform import Platform
+from ..._message import Message
+
+
+class CleanupJob(Job):
+    def __init__(self, *args, complete_cb, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._complete_cb = complete_cb
+        self._cache = Platform._instance.artifactcache
+
+    def _child_process(self):
+        return self._cache.clean()
+
+    def _parent_complete(self, success, result):
+        self._cache._set_cache_size(result)
+        self._complete_cb()
+
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        self._logfile = logfile.format(pid=os.getpid())
+        yield self._logfile
+        self._logfile = None
+
+    # _message():
+    #
+    # Sends a message to the frontend
+    #
+    # Args:
+    #    message_type (MessageType): The type of message to send
+    #    message (str): The message
+    #    kwargs: Remaining Message() constructor arguments
+    #
+    def _message(self, message_type, message, **kwargs):
+        args = dict(kwargs)
+        args['scheduler'] = True
+        self._scheduler.context.message(Message(None, message_type, message, **args))
+
+    def _child_log(self, message):
+        message.action_name = self.action_name
+
+        with open(self._logfile, 'a+') as log:
+            message_text = self._format_frontend_message(message, '[cleanup]')
+            log.write('{}\n'.format(message_text))
+            log.flush()
+
+        return message
+
+    def _child_process_data(self):
+        return {}
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index fb25901..01b7107 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -59,6 +59,7 @@ class JobType():
     PULL = 4
     PUSH = 5
     SIZE = 6
+    CLEAN = 7
 
 
 # Job()
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index c023973..a7e8e32 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -20,7 +20,7 @@
 
 import os
 from . import Queue, QueueStatus, QueueType
-from ..jobs import CacheSizeJob, JobType
+from ..jobs import CacheSizeJob, CleanupJob, JobType
 
 
 # A queue which assembles elements
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index c33c9d1..ec73620 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -146,6 +146,7 @@ class Scheduler():
         self._starttime = start_time
         self._suspendtime = None
         self._queue_jobs = True      # Whether we should continue to queue jobs
+        self._start_cleanup = False  # Whether we would like to launch a cleanup job
 
         # Initialize task tokens with the number allowed by
         # the user configuration
diff --git a/buildstream/element.py b/buildstream/element.py
index 98a39a4..c5b62e4 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -1525,8 +1525,8 @@ class Element(Plugin):
                 }), os.path.join(metadir, 'workspaced-dependencies.yaml'))
 
                 with self.timed_activity("Caching artifact"):
-                    self.__artifacts._commit(self, assembledir, self.__get_cache_keys_for_commit())
                     self.__artifact_size = utils._get_dir_size(assembledir)
+                    self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
 
             # Finally cleanup the build dir
             cleanup_rootdir()
diff --git a/buildstream/utils.py b/buildstream/utils.py
index 1cdf575..e8270d8 100644
--- a/buildstream/utils.py
+++ b/buildstream/utils.py
@@ -96,7 +96,7 @@ class FileListResult():
         return ret
 
 
-def list_relative_paths(directory):
+def list_relative_paths(directory, *, list_dirs=True):
     """A generator for walking directory relative paths
 
     This generator is useful for checking the full manifest of
@@ -110,6 +110,7 @@ def list_relative_paths(directory):
 
     Args:
        directory (str): The directory to list files in
+       list_dirs (bool): Whether to list directories
 
     Yields:
        Relative filenames in `directory`
@@ -136,15 +137,16 @@ def list_relative_paths(directory):
         # subdirectories in the walked `dirpath`, so we extract
         # these symlinks from `dirnames`
         #
-        for d in dirnames:
-            fullpath = os.path.join(dirpath, d)
-            if os.path.islink(fullpath):
-                yield os.path.join(basepath, d)
+        if list_dirs:
+            for d in dirnames:
+                fullpath = os.path.join(dirpath, d)
+                if os.path.islink(fullpath):
+                    yield os.path.join(basepath, d)
 
         # We've decended into an empty directory, in this case we
         # want to include the directory itself, but not in any other
         # case.
-        if not filenames:
+        if list_dirs and not filenames:
             yield relpath
 
         # List the filenames in the walked directory