You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:30:13 UTC

[buildstream] 15/32: Don't expire artifacts that are required for the pipeline

This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch testing/local-cache-expiry
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit e1cf61345851a5aeb5981b1f70c2ab1434eded34
Author: Tristan Maat <tm...@tlater.net>
AuthorDate: Wed Mar 28 15:03:42 2018 +0000

    Don't expire artifacts that are required for the pipeline
---
 buildstream/_artifactcache/artifactcache.py | 50 ++++++++++++++++++++++++++++-
 buildstream/_artifactcache/ostreecache.py   | 12 +++++--
 buildstream/_artifactcache/tarcache.py      |  2 +-
 buildstream/_pipeline.py                    |  2 ++
 buildstream/_scheduler/queues/buildqueue.py |  2 ++
 buildstream/element.py                      | 10 ++++++
 6 files changed, 73 insertions(+), 5 deletions(-)

diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
old mode 100644
new mode 100755
index a7bf679..bf8ff4a
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -21,6 +21,7 @@ import os
 import string
 from collections import Mapping, namedtuple
 
+from ..element import _KeyStrength
 from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
 from .._message import Message, MessageType
 from .. import utils
@@ -61,6 +62,7 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push')):
 class ArtifactCache():
     def __init__(self, context):
         self.context = context
+        self.required_artifacts = set()
         self.extractdir = os.path.join(context.artifactdir, 'extract')
         self.max_size = context.cache_quota
         self.estimated_size = None
@@ -165,6 +167,38 @@ class ArtifactCache():
                                   (str(provenance)))
         return cache_specs
 
+    # append_required_artifacts():
+    #
+    # Append to the list of elements whose artifacts are required for
+    # the current run. Artifacts whose elements are in this list will
+    # be locked by the artifact cache and not touched for the duration
+    # of the current pipeline.
+    #
+    # Args:
+    #     elements (iterable): A set of elements to mark as required
+    #
+    def append_required_artifacts(self, elements):
+        # We lock both strong and weak keys - deleting one but not the
+        # other won't save space in most cases anyway, but would be a
+        # user inconvenience.
+
+        for element in elements:
+            strong_key = element._get_cache_key(strength=_KeyStrength.STRONG)
+            weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
+
+            for key in (strong_key, weak_key):
+                if key and key not in self.required_artifacts:
+                    self.required_artifacts.add(key)
+
+                    # We also update the usage times of any artifacts
+                    # we will be using, which helps preventing a
+                    # buildstream process that runs in parallel with
+                    # this one from removing artifacts in-use.
+                    try:
+                        self.update_atime(element, key)
+                    except FileNotFoundError:
+                        pass
+
     # clean():
     #
     # Clean the artifact cache as much as possible.
@@ -193,7 +227,9 @@ class ArtifactCache():
                 else:
                     break
 
-            self.remove(to_remove)
+            key = to_remove.rpartition('/')[2]
+            if key not in self.required_artifacts:
+                self.remove(to_remove)
 
         # This should be O(1) if implemented correctly
         return self.calculate_cache_size()
@@ -282,6 +318,18 @@ class ArtifactCache():
         raise ImplError("Cache '{kind}' does not implement remove()"
                         .format(kind=type(self).__name__))
 
+    # update_atime():
+    #
+    # Update the access time of an element.
+    #
+    # Args:
+    #     element (Element): The Element to mark
+    #     key (str): The cache key to use
+    #
+    def update_atime(self, element, key):
+        raise ImplError("Cache '{kind}' does not implement update_atime()"
+                        .format(kind=type(self).__name__))
+
     # extract():
     #
     # Extract cached artifact for the specified Element if it hasn't
diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py
index e0bbc3c..bb8f4fc 100755
--- a/buildstream/_artifactcache/ostreecache.py
+++ b/buildstream/_artifactcache/ostreecache.py
@@ -103,6 +103,11 @@ class OSTreeCache(ArtifactCache):
         # correct number of artifacts.
         self.cache_size -= _ostree.remove(self.repo, artifact_name, defer_prune=False)
 
+    def update_atime(self, element, key):
+        ref = self.get_artifact_fullname(element, key)
+        ref_file = os.path.join(self.repo.get_path().get_path(), 'refs', 'heads', ref)
+        os.utime(ref_file)
+
     def extract(self, element, key):
         ref = self.get_artifact_fullname(element, key)
 
@@ -150,9 +155,7 @@ class OSTreeCache(ArtifactCache):
         except OSTreeError as e:
             raise ArtifactError("Failed to commit artifact: {}".format(e)) from e
 
-        for ref in refs:
-            ref_file = os.path.join(self.repo.get_path().get_path(), 'refs', 'heads', ref)
-            os.utime(ref_file)
+        self.append_required_artifacts([element])
 
         self.cache_size = None
 
@@ -189,6 +192,9 @@ class OSTreeCache(ArtifactCache):
                 # fetch the artifact from highest priority remote using the specified cache key
                 remote_name = self._ensure_remote(self.repo, remote.pull_url)
                 _ostree.fetch(self.repo, remote=remote_name, ref=ref, progress=progress)
+
+                self.append_required_artifacts([element])
+
                 return True
             except OSTreeError:
                 # Try next remote
diff --git a/buildstream/_artifactcache/tarcache.py b/buildstream/_artifactcache/tarcache.py
index ad41e42..4e9f5f9 100644
--- a/buildstream/_artifactcache/tarcache.py
+++ b/buildstream/_artifactcache/tarcache.py
@@ -86,7 +86,7 @@ class TarCache(ArtifactCache):
                 _Tar.archive(os.path.join(self.tardir, ref), key, temp)
 
             self.cache_size = None
-            os.utime(os.path.join(self.tardir, ref))
+            self.append_required_artifacts([element])
 
     # update_atime():
     #
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 9f4504d..7f159c7 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -159,6 +159,8 @@ class Pipeline():
                 # Determine initial element state.
                 element._update_state()
 
+            self._artifacts.append_required_artifacts((e for e in self.dependencies(targets, Scope.ALL)))
+
     # dependencies()
     #
     # Generator function to iterate over elements and optionally
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index a7e8e32..9b2cbe6 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -72,6 +72,8 @@ class BuildQueue(Queue):
             # Inform element in main process that assembly is done
             element._assemble_done()
 
+        # This has to be done after _assemble_done, such that the
+        # element may register its cache key as required
         self._check_cache_size(job, element)
 
         return True
diff --git a/buildstream/element.py b/buildstream/element.py
index c5b62e4..518fb59 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -1398,6 +1398,16 @@ class Element(Plugin):
             workspace.clear_running_files()
             self._get_context().get_workspaces().save_config()
 
+            # We also need to update the required artifacts, since
+            # workspaced dependencies do not have a fixed cache key
+            # when the build starts.
+            #
+            # This does *not* cause a race condition, because
+            # _assemble_done is called before a cleanup job may be
+            # launched.
+            #
+            self.__artifacts.append_required_artifacts([self])
+
     # _assemble():
     #
     # Internal method for running the entire build phase.