You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:48:24 UTC

[buildstream] 08/11: Compute the artifact cache size after each build/pull

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch 135-expire-artifacts-in-local-cache
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit b722c279698c14cedc8e996dd5eb9abf90231744
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Jul 17 13:02:57 2018 +0100

    Compute the artifact cache size after each build/pull
---
 buildstream/_artifactcache/artifactcache.py |  1 +
 buildstream/_artifactcache/cascache.py      | 11 +++-
 buildstream/_scheduler/jobs/__init__.py     |  1 +
 buildstream/_scheduler/jobs/cachesizejob.py | 91 +++++++++++++++++++++++++++++
 buildstream/_scheduler/queues/buildqueue.py | 17 ++++++
 buildstream/_scheduler/queues/pullqueue.py  |  5 ++
 buildstream/_scheduler/scheduler.py         | 12 +++-
 7 files changed, 136 insertions(+), 2 deletions(-)

diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 3541f24..9abe68c 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -85,6 +85,7 @@ class ArtifactCache():
         self.project_remote_specs = {}
 
         self._local = False
+        self.cache_size = None
 
         os.makedirs(context.artifactdir, exist_ok=True)
 
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index f202507..4f1d8ac 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -77,7 +77,7 @@ class CASCache(ArtifactCache):
     def extract(self, element, key):
         ref = self.get_artifact_fullname(element, key)
 
-        tree = self.resolve_ref(ref)
+        tree = self.resolve_ref(ref, update_mtime=True)
 
         dest = os.path.join(self.extractdir, element._get_project().name, element.normal_name, tree.hash)
         if os.path.isdir(dest):
@@ -113,6 +113,8 @@ class CASCache(ArtifactCache):
         for ref in refs:
             self.set_ref(ref, tree)
 
+        self.cache_size = None
+
     def diff(self, element, key_a, key_b, *, subdir=None):
         ref_a = self.get_artifact_fullname(element, key_a)
         ref_b = self.get_artifact_fullname(element, key_b)
@@ -448,6 +450,13 @@ class CASCache(ArtifactCache):
         except FileNotFoundError as e:
             raise ArtifactError("Attempt to access unavailable artifact: {}".format(e)) from e
 
+    def calculate_cache_size(self):
+        if self.cache_size is None:
+            self.cache_size = utils._get_dir_size(self.casdir)
+            self.estimated_size = self.cache_size
+
+        return self.cache_size
+
     # list_artifacts():
     #
     # List cached artifacts in Least Recently Modified (LRM) order.
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
index 0030f5c..9815586 100644
--- a/buildstream/_scheduler/jobs/__init__.py
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -1 +1,2 @@
 from .elementjob import ElementJob
+from .cachesizejob import CacheSizeJob
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
new file mode 100644
index 0000000..f4e2393
--- /dev/null
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -0,0 +1,91 @@
+#  Copyright (C) 2018 Codethink Limited
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Author:
+#        Tristan Daniƫl Maat <tr...@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from .job import Job
+from ..._platform import Platform
+from ..._message import Message, MessageType
+
+
+class CacheSizeJob(Job):
+    def __init__(self, *args, complete_cb, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._complete_cb = complete_cb
+        self._cache = Platform._instance.artifactcache
+
+    def _child_process(self):
+        return self._cache.calculate_cache_size()
+
+    def _parent_complete(self, success, result):
+        self._cache._set_cache_size(result)
+        if self._complete_cb:
+            self._complete_cb(result)
+
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        self._logfile = logfile.format(pid=os.getpid())
+        yield self._logfile
+        self._logfile = None
+
+    # message():
+    #
+    # Sends a message to the frontend
+    #
+    # Args:
+    #    message_type (MessageType): The type of message to send
+    #    message (str): The message
+    #    kwargs: Remaining Message() constructor arguments
+    #
+    def message(self, message_type, message, **kwargs):
+        args = dict(kwargs)
+        args['scheduler'] = True
+        self._scheduler.context.message(Message(None, message_type, message, **args))
+
+    def _child_log(self, message):
+        with open(self._logfile, 'a+') as log:
+            INDENT = "    "
+            EMPTYTIME = "--:--:--"
+
+            template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
+            detail = ''
+            if message.detail is not None:
+                template += "\n\n{detail}"
+                detail = message.detail.rstrip('\n')
+                detail = INDENT + INDENT.join(detail.splitlines(True))
+
+            timecode = EMPTYTIME
+            if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+                hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+                minutes, seconds = divmod(remainder, 60)
+                timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+            message_text = template.format(timecode=timecode,
+                                           type=message.message_type.upper(),
+                                           name='cache_size',
+                                           message=message.message,
+                                           detail=detail)
+
+            log.write('{}\n'.format(message_text))
+            log.flush()
+
+        return message
+
+    def _child_process_data(self):
+        return {}
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 7f8ac9e..376ef5a 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -51,10 +51,27 @@ class BuildQueue(Queue):
 
         return QueueStatus.READY
 
+    def _check_cache_size(self, job, element):
+        if not job.child_data:
+            return
+
+        artifact_size = job.child_data.get('artifact_size', False)
+
+        if artifact_size:
+            cache = element._get_artifact_cache()
+            cache._add_artifact_size(artifact_size)
+
+            if cache.get_approximate_cache_size() > self._scheduler.context.cache_quota:
+                self._scheduler._check_cache_size_real()
+
     def done(self, job, element, result, success):
 
         if success:
             # Inform element in main process that assembly is done
             element._assemble_done()
 
+        # This has to be done after _assemble_done, such that the
+        # element may register its cache key as required
+        self._check_cache_size(job, element)
+
         return True
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index efaa59e..430afc4 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -59,6 +59,11 @@ class PullQueue(Queue):
 
         element._pull_done()
 
+        # Build jobs will check the "approximate" size first. Since we
+        # do not get an artifact size from pull jobs, we have to
+        # actually check the cache size.
+        self._scheduler._check_cache_size_real()
+
         # Element._pull() returns True if it downloaded an artifact,
         # here we want to appear skipped if we did not download.
         return result
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index bc182db..a11134c 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -27,7 +27,8 @@ import datetime
 from contextlib import contextmanager
 
 # Local imports
-from .resources import Resources
+from .resources import Resources, ResourceType
+from .jobs import CacheSizeJob
 
 
 # A decent return code for Scheduler.run()
@@ -312,6 +313,15 @@ class Scheduler():
         self.schedule_jobs(ready)
         self._sched()
 
+    def _check_cache_size_real(self):
+        logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log')
+        job = CacheSizeJob(self, 'cache_size', logpath,
+                           resources=[ResourceType.CACHE,
+                                      ResourceType.PROCESS],
+                           exclusive_resources=[ResourceType.CACHE],
+                           complete_cb=None)
+        self.schedule_jobs([job])
+
     # _suspend_jobs()
     #
     # Suspend all ongoing jobs.