You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:30:13 UTC
[buildstream] 15/32: Don't expire artifacts that are required for
the pipeline
This is an automated email from the ASF dual-hosted git repository.
root pushed a commit to branch testing/local-cache-expiry
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit e1cf61345851a5aeb5981b1f70c2ab1434eded34
Author: Tristan Maat <tm...@tlater.net>
AuthorDate: Wed Mar 28 15:03:42 2018 +0000
Don't expire artifacts that are required for the pipeline
---
buildstream/_artifactcache/artifactcache.py | 50 ++++++++++++++++++++++++++++-
buildstream/_artifactcache/ostreecache.py | 12 +++++--
buildstream/_artifactcache/tarcache.py | 2 +-
buildstream/_pipeline.py | 2 ++
buildstream/_scheduler/queues/buildqueue.py | 2 ++
buildstream/element.py | 10 ++++++
6 files changed, 73 insertions(+), 5 deletions(-)
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
old mode 100644
new mode 100755
index a7bf679..bf8ff4a
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -21,6 +21,7 @@ import os
import string
from collections import Mapping, namedtuple
+from ..element import _KeyStrength
from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
from .._message import Message, MessageType
from .. import utils
@@ -61,6 +62,7 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push')):
class ArtifactCache():
def __init__(self, context):
self.context = context
+ self.required_artifacts = set()
self.extractdir = os.path.join(context.artifactdir, 'extract')
self.max_size = context.cache_quota
self.estimated_size = None
@@ -165,6 +167,38 @@ class ArtifactCache():
(str(provenance)))
return cache_specs
+ # append_required_artifacts():
+ #
+ # Append to the list of elements whose artifacts are required for
+ # the current run. Artifacts whose elements are in this list will
+ # be locked by the artifact cache and not touched for the duration
+ # of the current pipeline.
+ #
+ # Args:
+ # elements (iterable): A set of elements to mark as required
+ #
+ def append_required_artifacts(self, elements):
+ # We lock both strong and weak keys - deleting one but not the
+ # other won't save space in most cases anyway, but would be a
+ # user inconvenience.
+
+ for element in elements:
+ strong_key = element._get_cache_key(strength=_KeyStrength.STRONG)
+ weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
+
+ for key in (strong_key, weak_key):
+ if key and key not in self.required_artifacts:
+ self.required_artifacts.add(key)
+
+ # We also update the usage times of any artifacts
+ # we will be using, which helps preventing a
+ # buildstream process that runs in parallel with
+ # this one from removing artifacts in-use.
+ try:
+ self.update_atime(element, key)
+ except FileNotFoundError:
+ pass
+
# clean():
#
# Clean the artifact cache as much as possible.
@@ -193,7 +227,9 @@ class ArtifactCache():
else:
break
- self.remove(to_remove)
+ key = to_remove.rpartition('/')[2]
+ if key not in self.required_artifacts:
+ self.remove(to_remove)
# This should be O(1) if implemented correctly
return self.calculate_cache_size()
@@ -282,6 +318,18 @@ class ArtifactCache():
raise ImplError("Cache '{kind}' does not implement remove()"
.format(kind=type(self).__name__))
+ # update_atime():
+ #
+ # Update the access time of an element.
+ #
+ # Args:
+ # element (Element): The Element to mark
+ # key (str): The cache key to use
+ #
+ def update_atime(self, element, key):
+ raise ImplError("Cache '{kind}' does not implement update_atime()"
+ .format(kind=type(self).__name__))
+
# extract():
#
# Extract cached artifact for the specified Element if it hasn't
diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py
index e0bbc3c..bb8f4fc 100755
--- a/buildstream/_artifactcache/ostreecache.py
+++ b/buildstream/_artifactcache/ostreecache.py
@@ -103,6 +103,11 @@ class OSTreeCache(ArtifactCache):
# correct number of artifacts.
self.cache_size -= _ostree.remove(self.repo, artifact_name, defer_prune=False)
+ def update_atime(self, element, key):
+ ref = self.get_artifact_fullname(element, key)
+ ref_file = os.path.join(self.repo.get_path().get_path(), 'refs', 'heads', ref)
+ os.utime(ref_file)
+
def extract(self, element, key):
ref = self.get_artifact_fullname(element, key)
@@ -150,9 +155,7 @@ class OSTreeCache(ArtifactCache):
except OSTreeError as e:
raise ArtifactError("Failed to commit artifact: {}".format(e)) from e
- for ref in refs:
- ref_file = os.path.join(self.repo.get_path().get_path(), 'refs', 'heads', ref)
- os.utime(ref_file)
+ self.append_required_artifacts([element])
self.cache_size = None
@@ -189,6 +192,9 @@ class OSTreeCache(ArtifactCache):
# fetch the artifact from highest priority remote using the specified cache key
remote_name = self._ensure_remote(self.repo, remote.pull_url)
_ostree.fetch(self.repo, remote=remote_name, ref=ref, progress=progress)
+
+ self.append_required_artifacts([element])
+
return True
except OSTreeError:
# Try next remote
diff --git a/buildstream/_artifactcache/tarcache.py b/buildstream/_artifactcache/tarcache.py
index ad41e42..4e9f5f9 100644
--- a/buildstream/_artifactcache/tarcache.py
+++ b/buildstream/_artifactcache/tarcache.py
@@ -86,7 +86,7 @@ class TarCache(ArtifactCache):
_Tar.archive(os.path.join(self.tardir, ref), key, temp)
self.cache_size = None
- os.utime(os.path.join(self.tardir, ref))
+ self.append_required_artifacts([element])
# update_atime():
#
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 9f4504d..7f159c7 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -159,6 +159,8 @@ class Pipeline():
# Determine initial element state.
element._update_state()
+ self._artifacts.append_required_artifacts((e for e in self.dependencies(targets, Scope.ALL)))
+
# dependencies()
#
# Generator function to iterate over elements and optionally
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index a7e8e32..9b2cbe6 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -72,6 +72,8 @@ class BuildQueue(Queue):
# Inform element in main process that assembly is done
element._assemble_done()
+ # This has to be done after _assemble_done, such that the
+ # element may register its cache key as required
self._check_cache_size(job, element)
return True
diff --git a/buildstream/element.py b/buildstream/element.py
index c5b62e4..518fb59 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -1398,6 +1398,16 @@ class Element(Plugin):
workspace.clear_running_files()
self._get_context().get_workspaces().save_config()
+ # We also need to update the required artifacts, since
+ # workspaced dependencies do not have a fixed cache key
+ # when the build starts.
+ #
+ # This does *not* cause a race condition, because
+ # _assemble_done is called before a cleanup job may be
+ # launched.
+ #
+ self.__artifacts.append_required_artifacts([self])
+
# _assemble():
#
# Internal method for running the entire build phase.