You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:30:08 UTC
[buildstream] 10/32: Calculate the artifact cache size
This is an automated email from the ASF dual-hosted git repository.
root pushed a commit to branch testing/local-cache-expiry
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 66b25acd197b4739a2db3b27d12a423b18348601
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Mar 20 09:24:01 2018 +0000
Calculate the artifact cache size
---
buildstream/_artifactcache/artifactcache.py | 70 ++++++++++++++++++++++
buildstream/_artifactcache/ostreecache.py | 12 ++++
buildstream/_artifactcache/tarcache.py | 20 ++++++-
buildstream/_scheduler/jobs/__init__.py | 1 +
buildstream/_scheduler/jobs/cachesizejob.py | 91 +++++++++++++++++++++++++++++
buildstream/_scheduler/jobs/elementjob.py | 6 ++
buildstream/_scheduler/jobs/job.py | 1 +
buildstream/_scheduler/queues/buildqueue.py | 19 +++++-
buildstream/_scheduler/queues/fetchqueue.py | 2 +-
buildstream/_scheduler/queues/pullqueue.py | 7 ++-
buildstream/_scheduler/queues/pushqueue.py | 2 +-
buildstream/_scheduler/queues/queue.py | 21 ++++---
buildstream/_scheduler/queues/trackqueue.py | 2 +-
buildstream/element.py | 23 +++++++-
14 files changed, 260 insertions(+), 17 deletions(-)
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index e9611bf..3b2aaf5 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -62,6 +62,9 @@ class ArtifactCache():
def __init__(self, context):
self.context = context
self.extractdir = os.path.join(context.artifactdir, 'extract')
+ self.max_size = context.cache_quota
+ self.estimated_size = None
+
self.global_remote_specs = []
self.project_remote_specs = {}
@@ -162,6 +165,35 @@ class ArtifactCache():
(str(provenance)))
return cache_specs
+ # get_approximate_cache_size()
+ #
+ # A cheap method that aims to serve as an upper limit on the
+ # artifact cache size.
+ #
+ # The cache size reported by this function will normally be larger
+ # than the real cache size, since it is calculated using the
+ # pre-commit artifact size, but for very small artifacts in
+ # certain caches additional overhead could cause this to be
+ # smaller than, but close to, the actual size.
+ #
+ # Nonetheless, in practice this should be safe to use as an upper
+ # limit on the cache size.
+ #
+ # If the cache has built-in constant-time size reporting, please
+ # feel free to override this method with a more accurate
+ # implementation.
+ #
+ # Returns:
+ # (int) An approximation of the artifact cache size.
+ #
+ def get_approximate_cache_size(self):
+ # If we don't currently have an estimate, figure out the real
+ # cache size.
+ if self.estimated_size is None:
+ self.estimated_size = self.calculate_cache_size()
+
+ return self.estimated_size
+
################################################
# Abstract methods for subclasses to implement #
################################################
@@ -334,6 +366,20 @@ class ArtifactCache():
raise ImplError("Cache '{kind}' does not implement link_key()"
.format(kind=type(self).__name__))
+ # calculate_cache_size()
+ #
+ # Return the real artifact cache size.
+ #
+ # Implementations should also use this to update estimated_size.
+ #
+ # Returns:
+ #
+ # (int) The size of the artifact cache.
+ #
+ def calculate_cache_size(self):
+ raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
+ .format(kind=type(self).__name__))
+
################################################
# Local Private Methods #
################################################
@@ -375,6 +421,30 @@ class ArtifactCache():
with self.context.timed_activity("Initializing remote caches", silent_nested=True):
self.initialize_remotes(on_failure=remote_failed)
+ # _add_artifact_size()
+ #
+ # Since we cannot keep track of the cache size between threads,
+ # this method will be called by the main process every time a
+ # process that added something to the cache finishes.
+ #
+ # This will then add the reported size to
+ # ArtifactCache.estimated_size.
+ #
+ def _add_artifact_size(self, artifact_size):
+ if not self.estimated_size:
+ self.estimated_size = self.calculate_cache_size()
+
+ self.estimated_size += artifact_size
+
+ # _set_cache_size()
+ #
+ # Similarly to the above method, when we calculate the actual size
+ # in a child thread, we can't update it. We instead pass the value
+ # back to the main thread and update it there.
+ #
+ def _set_cache_size(self, cache_size):
+ self.estimated_size = cache_size
+
# _configured_remote_artifact_cache_specs():
#
diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py
old mode 100644
new mode 100755
index f71bee0..8d22917
--- a/buildstream/_artifactcache/ostreecache.py
+++ b/buildstream/_artifactcache/ostreecache.py
@@ -59,6 +59,9 @@ class OSTreeCache(ArtifactCache):
self._has_fetch_remotes = False
self._has_push_remotes = False
+ # A cached artifact cache size (irony?)
+ self.cache_size = None
+
################################################
# Implementation of abstract methods #
################################################
@@ -137,6 +140,8 @@ class OSTreeCache(ArtifactCache):
except OSTreeError as e:
raise ArtifactError("Failed to commit artifact: {}".format(e)) from e
+ self.cache_size = None
+
def can_diff(self):
return True
@@ -204,6 +209,13 @@ class OSTreeCache(ArtifactCache):
return any_pushed
+ def calculate_cache_size(self):
+ if self.cache_size is None:
+ self.cache_size = utils._get_dir_size(self.repo.get_path().get_path())
+ self.estimated_size = self.cache_size
+
+ return self.cache_size
+
def initialize_remotes(self, *, on_failure=None):
remote_specs = self.global_remote_specs.copy()
diff --git a/buildstream/_artifactcache/tarcache.py b/buildstream/_artifactcache/tarcache.py
index d995929..3cae9d9 100644
--- a/buildstream/_artifactcache/tarcache.py
+++ b/buildstream/_artifactcache/tarcache.py
@@ -36,6 +36,7 @@ class TarCache(ArtifactCache):
self.tardir = os.path.join(context.artifactdir, 'tar')
os.makedirs(self.tardir, exist_ok=True)
+ self.cache_size = None
################################################
# Implementation of abstract methods #
@@ -52,7 +53,7 @@ class TarCache(ArtifactCache):
artifact = os.path.join(self.tardir, artifact_name + '.tar.bz2')
size = os.stat(artifact, follow_symlinks=False).st_size
os.remove(artifact)
- return size
+ self.cache_size -= size
def commit(self, element, content, keys):
os.makedirs(os.path.join(self.tardir, element._get_project().name, element.normal_name), exist_ok=True)
@@ -66,6 +67,8 @@ class TarCache(ArtifactCache):
_Tar.archive(os.path.join(self.tardir, ref), key, temp)
+ self.cache_size = None
+
def extract(self, element, key):
fullname = self.get_artifact_fullname(element, key)
@@ -100,6 +103,21 @@ class TarCache(ArtifactCache):
return dest
+ # get_cache_size()
+ #
+ # Return the artifact cache size.
+ #
+ # Returns:
+ #
+ # (int) The size of the artifact cache.
+ #
+ def calculate_cache_size(self):
+ if self.cache_size is None:
+ self.cache_size = utils._get_dir_size(self.tardir)
+ self.estimated_size = self.cache_size
+
+ return self.cache_size
+
# _tarpath()
#
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
index 0030f5c..9815586 100644
--- a/buildstream/_scheduler/jobs/__init__.py
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -1 +1,2 @@
from .elementjob import ElementJob
+from .cachesizejob import CacheSizeJob
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
new file mode 100644
index 0000000..897e896
--- /dev/null
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -0,0 +1,91 @@
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author:
+# Tristan Daniƫl Maat <tr...@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from .job import Job
+from ..._platform import Platform
+from ..._message import Message, MessageType
+
+
+class CacheSizeJob(Job):
+ def __init__(self, *args, complete_cb, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._complete_cb = complete_cb
+ self._cache = Platform._instance.artifactcache
+
+ def _child_process(self):
+ return self._cache.calculate_cache_size()
+
+ def _parent_complete(self, success, result):
+ self._cache._set_cache_size(result)
+ if self._complete_cb:
+ self._complete_cb(result)
+
+ @contextmanager
+ def _child_logging_enabled(self, logfile):
+ self._logfile = logfile.format(pid=os.getpid())
+ yield self._logfile
+ self._logfile = None
+
+ # _message():
+ #
+ # Sends a message to the frontend
+ #
+ # Args:
+ # message_type (MessageType): The type of message to send
+ # message (str): The message
+ # kwargs: Remaining Message() constructor arguments
+ #
+ def _message(self, message_type, message, **kwargs):
+ args = dict(kwargs)
+ args['scheduler'] = True
+ self._scheduler.context.message(Message(None, message_type, message, **args))
+
+ def _child_log(self, message):
+ with open(self._logfile, 'a+') as log:
+ INDENT = " "
+ EMPTYTIME = "--:--:--"
+
+ template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
+ detail = ''
+ if message.detail is not None:
+ template += "\n\n{detail}"
+ detail = message.detail.rstrip('\n')
+ detail = INDENT + INDENT.join(detail.splitlines(True))
+
+ timecode = EMPTYTIME
+ if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+ hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+ minutes, seconds = divmod(remainder, 60)
+ timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+ message_text = template.format(timecode=timecode,
+ type=message.message_type.upper(),
+ name='cache_size',
+ message=message.message,
+ detail=detail)
+
+ log.write('{}\n'.format(message_text))
+ log.flush()
+
+ return message
+
+ def _child_process_data(self):
+ return {}
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
index 36e7c1d..45c16eb 100644
--- a/buildstream/_scheduler/jobs/elementjob.py
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -210,7 +210,13 @@ class ElementJob(Job):
data = {}
workspace = self._element._get_workspace()
+ artifact_size = self._element._get_artifact_size()
+ cache_size = self._element._get_artifact_cache().cache_size
+
if workspace is not None:
data['workspace'] = workspace.to_dict()
+ if artifact_size is not None:
+ data['artifact_size'] = artifact_size
+ data['cache_size'] = cache_size
return data
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index cf5bf07..fb25901 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -58,6 +58,7 @@ class JobType():
BUILD = 3
PULL = 4
PUSH = 5
+ SIZE = 6
# Job()
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 0c75538..c023973 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -20,7 +20,7 @@
import os
from . import Queue, QueueStatus, QueueType
-from ..jobs import JobType
+from ..jobs import CacheSizeJob, JobType
# A queue which assembles elements
@@ -53,10 +53,25 @@ class BuildQueue(Queue):
return QueueStatus.READY
- def done(self, element, result, success):
+ def _check_cache_size(self, job, element):
+ if not job.child_data:
+ return
+
+ artifact_size = job.child_data.get('artifact_size', False)
+
+ if artifact_size:
+ cache = element._get_artifact_cache()
+ cache._add_artifact_size(artifact_size)
+
+ if cache.get_approximate_cache_size() > self._scheduler.context.cache_quota:
+ self._scheduler._check_cache_size_real()
+
+ def done(self, job, element, result, success):
if success:
# Inform element in main process that assembly is done
element._assemble_done()
+ self._check_cache_size(job, element)
+
return True
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index 2438a9a..a4ec8ad 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -68,7 +68,7 @@ class FetchQueue(Queue):
return QueueStatus.READY
- def done(self, element, result, success):
+ def done(self, _, element, result, success):
if not success:
return False
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index 1fc4364..20e7843 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -53,13 +53,18 @@ class PullQueue(Queue):
else:
return QueueStatus.SKIP
- def done(self, element, result, success):
+ def done(self, _, element, result, success):
if not success:
return False
element._pull_done()
+ # Build jobs will check the "approximate" size first. Since we
+ # do not get an artifact size from pull jobs, we have to
+ # actually check the cache size.
+ self._scheduler._check_cache_size_real()
+
# Element._pull() returns True if it downloaded an artifact,
# here we want to appear skipped if we did not download.
return result
diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
index aa5540e..77f2e02 100644
--- a/buildstream/_scheduler/queues/pushqueue.py
+++ b/buildstream/_scheduler/queues/pushqueue.py
@@ -42,7 +42,7 @@ class PushQueue(Queue):
return QueueStatus.READY
- def done(self, element, result, success):
+ def done(self, _, element, result, success):
if not success:
return False
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 39e431b..2ff10d8 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -145,6 +145,7 @@ class Queue():
# Abstract method for handling a successful job completion.
#
# Args:
+ # job (Job): The job which completed processing
# element (Element): The element which completed processing
# result (any): The return value of the process() implementation
# success (bool): True if the process() implementation did not
@@ -154,7 +155,7 @@ class Queue():
# (bool): True if the element should appear to be processsed,
# Otherwise False will count the element as "skipped"
#
- def done(self, element, result, success):
+ def done(self, job, element, result, success):
pass
#####################################################
@@ -224,6 +225,7 @@ class Queue():
def process_ready(self):
scheduler = self._scheduler
unready = []
+ ready = []
while self._wait_queue and scheduler.get_job_token(self.queue_type):
element = self._wait_queue.popleft()
@@ -248,12 +250,14 @@ class Queue():
action_cb=self.process,
complete_cb=self._job_done,
max_retries=self._max_retries)
- scheduler.job_starting(job, element)
+ ready.append(job)
# These were not ready but were in the beginning, give em
# first priority again next time around
self._wait_queue.extendleft(unready)
+ return ready
+
#####################################################
# Private Methods #
#####################################################
@@ -270,7 +274,7 @@ class Queue():
def _update_workspaces(self, element, job):
workspace_dict = None
if job.child_data:
- workspace_dict = job.child_data['workspace']
+ workspace_dict = job.child_data.get('workspace', None)
# Handle any workspace modifications now
#
@@ -298,17 +302,17 @@ class Queue():
#
def _job_done(self, job, element, success, result):
- # Remove from our jobs
- self.active_jobs.remove(job)
-
- # Update workspaces in the main task before calling any queue implementation
+ # Update values that need to be synchronized in the main task
+ # before calling any queue implementation
self._update_workspaces(element, job)
+ if job.child_data:
+ element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
# Give the result of the job to the Queue implementor,
# and determine if it should be considered as processed
# or skipped.
try:
- processed = self.done(element, result, success)
+ processed = self.done(job, element, result, success)
except BstError as e:
@@ -346,7 +350,6 @@ class Queue():
self.failed_elements.append(element)
# Give the token for this job back to the scheduler
- # immediately before invoking another round of scheduling
self._scheduler.put_job_token(self.queue_type)
# Convenience wrapper for Queue implementations to send
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index a371e52..df3f7b1 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -49,7 +49,7 @@ class TrackQueue(Queue):
return QueueStatus.READY
- def done(self, element, result, success):
+ def done(self, _, element, result, success):
if not success:
return False
diff --git a/buildstream/element.py b/buildstream/element.py
index fc21f80..98a39a4 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -225,6 +225,7 @@ class Element(Plugin):
self.__staged_sources_directory = None # Location where Element.stage_sources() was called
self.__tainted = None # Whether the artifact is tainted and should not be shared
self.__required = False # Whether the artifact is required in the current session
+ self.__artifact_size = None # The size of data committed to the artifact cache
# hash tables of loaded artifact metadata, hashed by key
self.__metadata_keys = {} # Strong and weak keys for this key
@@ -1524,7 +1525,8 @@ class Element(Plugin):
}), os.path.join(metadir, 'workspaced-dependencies.yaml'))
with self.timed_activity("Caching artifact"):
- self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
+ self.__artifacts._commit(self, assembledir, self.__get_cache_keys_for_commit())
+ self.__artifact_size = utils._get_dir_size(assembledir)
# Finally cleanup the build dir
cleanup_rootdir()
@@ -1763,6 +1765,25 @@ class Element(Plugin):
workspaces = self._get_context().get_workspaces()
return workspaces.get_workspace(self._get_full_name())
+ # _get_artifact_size()
+ #
+ # Get the size of the artifact produced by this element in the
+ # current pipeline - if this element has not been assembled or
+ # pulled, this will be None.
+ #
+ # Note that this is the size of an artifact *before* committing it
+ # to the cache, the size on disk may differ. It can act as an
+ # approximate guide for when to do a proper size calculation.
+ #
+ # Returns:
+ # (int|None): The size of the artifact
+ #
+ def _get_artifact_size(self):
+ return self.__artifact_size
+
+ def _get_artifact_cache(self):
+ return self.__artifacts
+
# _write_script():
#
# Writes a script to the given directory.