You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:30:08 UTC

[buildstream] 10/32: Calculate the artifact cache size

This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch testing/local-cache-expiry
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 66b25acd197b4739a2db3b27d12a423b18348601
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Mar 20 09:24:01 2018 +0000

    Calculate the artifact cache size
---
 buildstream/_artifactcache/artifactcache.py | 70 ++++++++++++++++++++++
 buildstream/_artifactcache/ostreecache.py   | 12 ++++
 buildstream/_artifactcache/tarcache.py      | 20 ++++++-
 buildstream/_scheduler/jobs/__init__.py     |  1 +
 buildstream/_scheduler/jobs/cachesizejob.py | 91 +++++++++++++++++++++++++++++
 buildstream/_scheduler/jobs/elementjob.py   |  6 ++
 buildstream/_scheduler/jobs/job.py          |  1 +
 buildstream/_scheduler/queues/buildqueue.py | 19 +++++-
 buildstream/_scheduler/queues/fetchqueue.py |  2 +-
 buildstream/_scheduler/queues/pullqueue.py  |  7 ++-
 buildstream/_scheduler/queues/pushqueue.py  |  2 +-
 buildstream/_scheduler/queues/queue.py      | 21 ++++---
 buildstream/_scheduler/queues/trackqueue.py |  2 +-
 buildstream/element.py                      | 23 +++++++-
 14 files changed, 260 insertions(+), 17 deletions(-)

diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index e9611bf..3b2aaf5 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -62,6 +62,9 @@ class ArtifactCache():
     def __init__(self, context):
         self.context = context
         self.extractdir = os.path.join(context.artifactdir, 'extract')
+        self.max_size = context.cache_quota
+        self.estimated_size = None
+
         self.global_remote_specs = []
         self.project_remote_specs = {}
 
@@ -162,6 +165,35 @@ class ArtifactCache():
                                   (str(provenance)))
         return cache_specs
 
+    # get_approximate_cache_size()
+    #
+    # A cheap method that aims to serve as an upper limit on the
+    # artifact cache size.
+    #
+    # The cache size reported by this function will normally be larger
+    # than the real cache size, since it is calculated using the
+    # pre-commit artifact size, but for very small artifacts in
+    # certain caches additional overhead could cause this to be
+    # smaller than, but close to, the actual size.
+    #
+    # Nonetheless, in practice this should be safe to use as an upper
+    # limit on the cache size.
+    #
+    # If the cache has built-in constant-time size reporting, please
+    # feel free to override this method with a more accurate
+    # implementation.
+    #
+    # Returns:
+    #     (int) An approximation of the artifact cache size.
+    #
+    def get_approximate_cache_size(self):
+        # If we don't currently have an estimate, figure out the real
+        # cache size.
+        if self.estimated_size is None:
+            self.estimated_size = self.calculate_cache_size()
+
+        return self.estimated_size
+
     ################################################
     # Abstract methods for subclasses to implement #
     ################################################
@@ -334,6 +366,20 @@ class ArtifactCache():
         raise ImplError("Cache '{kind}' does not implement link_key()"
                         .format(kind=type(self).__name__))
 
+    # calculate_cache_size()
+    #
+    # Return the real artifact cache size.
+    #
+    # Implementations should also use this to update estimated_size.
+    #
+    # Returns:
+    #
+    # (int) The size of the artifact cache.
+    #
+    def calculate_cache_size(self):
+        raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
+                        .format(kind=type(self).__name__))
+
     ################################################
     #               Local Private Methods          #
     ################################################
@@ -375,6 +421,30 @@ class ArtifactCache():
         with self.context.timed_activity("Initializing remote caches", silent_nested=True):
             self.initialize_remotes(on_failure=remote_failed)
 
+    # _add_artifact_size()
+    #
+    # Since we cannot keep track of the cache size between threads,
+    # this method will be called by the main process every time a
+    # process that added something to the cache finishes.
+    #
+    # This will then add the reported size to
+    # ArtifactCache.estimated_size.
+    #
+    def _add_artifact_size(self, artifact_size):
+        if not self.estimated_size:
+            self.estimated_size = self.calculate_cache_size()
+
+        self.estimated_size += artifact_size
+
+    # _set_cache_size()
+    #
+    # Similarly to the above method, when we calculate the actual size
+    # in a child thread, we can't update it. We instead pass the value
+    # back to the main thread and update it there.
+    #
+    def _set_cache_size(self, cache_size):
+        self.estimated_size = cache_size
+
 
 # _configured_remote_artifact_cache_specs():
 #
diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py
old mode 100644
new mode 100755
index f71bee0..8d22917
--- a/buildstream/_artifactcache/ostreecache.py
+++ b/buildstream/_artifactcache/ostreecache.py
@@ -59,6 +59,9 @@ class OSTreeCache(ArtifactCache):
         self._has_fetch_remotes = False
         self._has_push_remotes = False
 
+        # A cached artifact cache size (irony?)
+        self.cache_size = None
+
     ################################################
     #     Implementation of abstract methods       #
     ################################################
@@ -137,6 +140,8 @@ class OSTreeCache(ArtifactCache):
         except OSTreeError as e:
             raise ArtifactError("Failed to commit artifact: {}".format(e)) from e
 
+        self.cache_size = None
+
     def can_diff(self):
         return True
 
@@ -204,6 +209,13 @@ class OSTreeCache(ArtifactCache):
 
         return any_pushed
 
+    def calculate_cache_size(self):
+        if self.cache_size is None:
+            self.cache_size = utils._get_dir_size(self.repo.get_path().get_path())
+            self.estimated_size = self.cache_size
+
+        return self.cache_size
+
     def initialize_remotes(self, *, on_failure=None):
         remote_specs = self.global_remote_specs.copy()
 
diff --git a/buildstream/_artifactcache/tarcache.py b/buildstream/_artifactcache/tarcache.py
index d995929..3cae9d9 100644
--- a/buildstream/_artifactcache/tarcache.py
+++ b/buildstream/_artifactcache/tarcache.py
@@ -36,6 +36,7 @@ class TarCache(ArtifactCache):
 
         self.tardir = os.path.join(context.artifactdir, 'tar')
         os.makedirs(self.tardir, exist_ok=True)
+        self.cache_size = None
 
     ################################################
     #     Implementation of abstract methods       #
@@ -52,7 +53,7 @@ class TarCache(ArtifactCache):
         artifact = os.path.join(self.tardir, artifact_name + '.tar.bz2')
         size = os.stat(artifact, follow_symlinks=False).st_size
         os.remove(artifact)
-        return size
+        self.cache_size -= size
 
     def commit(self, element, content, keys):
         os.makedirs(os.path.join(self.tardir, element._get_project().name, element.normal_name), exist_ok=True)
@@ -66,6 +67,8 @@ class TarCache(ArtifactCache):
 
                 _Tar.archive(os.path.join(self.tardir, ref), key, temp)
 
+            self.cache_size = None
+
     def extract(self, element, key):
 
         fullname = self.get_artifact_fullname(element, key)
@@ -100,6 +103,21 @@ class TarCache(ArtifactCache):
 
         return dest
 
+    # get_cache_size()
+    #
+    # Return the artifact cache size.
+    #
+    # Returns:
+    #
+    # (int) The size of the artifact cache.
+    #
+    def calculate_cache_size(self):
+        if self.cache_size is None:
+            self.cache_size = utils._get_dir_size(self.tardir)
+            self.estimated_size = self.cache_size
+
+        return self.cache_size
+
 
 # _tarpath()
 #
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
index 0030f5c..9815586 100644
--- a/buildstream/_scheduler/jobs/__init__.py
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -1 +1,2 @@
 from .elementjob import ElementJob
+from .cachesizejob import CacheSizeJob
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
new file mode 100644
index 0000000..897e896
--- /dev/null
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -0,0 +1,91 @@
+#  Copyright (C) 2018 Codethink Limited
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Author:
+#        Tristan Daniƫl Maat <tr...@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from .job import Job
+from ..._platform import Platform
+from ..._message import Message, MessageType
+
+
+class CacheSizeJob(Job):
+    def __init__(self, *args, complete_cb, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._complete_cb = complete_cb
+        self._cache = Platform._instance.artifactcache
+
+    def _child_process(self):
+        return self._cache.calculate_cache_size()
+
+    def _parent_complete(self, success, result):
+        self._cache._set_cache_size(result)
+        if self._complete_cb:
+            self._complete_cb(result)
+
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        self._logfile = logfile.format(pid=os.getpid())
+        yield self._logfile
+        self._logfile = None
+
+    # _message():
+    #
+    # Sends a message to the frontend
+    #
+    # Args:
+    #    message_type (MessageType): The type of message to send
+    #    message (str): The message
+    #    kwargs: Remaining Message() constructor arguments
+    #
+    def _message(self, message_type, message, **kwargs):
+        args = dict(kwargs)
+        args['scheduler'] = True
+        self._scheduler.context.message(Message(None, message_type, message, **args))
+
+    def _child_log(self, message):
+        with open(self._logfile, 'a+') as log:
+            INDENT = "    "
+            EMPTYTIME = "--:--:--"
+
+            template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
+            detail = ''
+            if message.detail is not None:
+                template += "\n\n{detail}"
+                detail = message.detail.rstrip('\n')
+                detail = INDENT + INDENT.join(detail.splitlines(True))
+
+            timecode = EMPTYTIME
+            if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+                hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+                minutes, seconds = divmod(remainder, 60)
+                timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+            message_text = template.format(timecode=timecode,
+                                           type=message.message_type.upper(),
+                                           name='cache_size',
+                                           message=message.message,
+                                           detail=detail)
+
+            log.write('{}\n'.format(message_text))
+            log.flush()
+
+        return message
+
+    def _child_process_data(self):
+        return {}
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
index 36e7c1d..45c16eb 100644
--- a/buildstream/_scheduler/jobs/elementjob.py
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -210,7 +210,13 @@ class ElementJob(Job):
         data = {}
 
         workspace = self._element._get_workspace()
+        artifact_size = self._element._get_artifact_size()
+        cache_size = self._element._get_artifact_cache().cache_size
+
         if workspace is not None:
             data['workspace'] = workspace.to_dict()
+        if artifact_size is not None:
+            data['artifact_size'] = artifact_size
+        data['cache_size'] = cache_size
 
         return data
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index cf5bf07..fb25901 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -58,6 +58,7 @@ class JobType():
     BUILD = 3
     PULL = 4
     PUSH = 5
+    SIZE = 6
 
 
 # Job()
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 0c75538..c023973 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -20,7 +20,7 @@
 
 import os
 from . import Queue, QueueStatus, QueueType
-from ..jobs import JobType
+from ..jobs import CacheSizeJob, JobType
 
 
 # A queue which assembles elements
@@ -53,10 +53,25 @@ class BuildQueue(Queue):
 
         return QueueStatus.READY
 
-    def done(self, element, result, success):
+    def _check_cache_size(self, job, element):
+        if not job.child_data:
+            return
+
+        artifact_size = job.child_data.get('artifact_size', False)
+
+        if artifact_size:
+            cache = element._get_artifact_cache()
+            cache._add_artifact_size(artifact_size)
+
+            if cache.get_approximate_cache_size() > self._scheduler.context.cache_quota:
+                self._scheduler._check_cache_size_real()
+
+    def done(self, job, element, result, success):
 
         if success:
             # Inform element in main process that assembly is done
             element._assemble_done()
 
+        self._check_cache_size(job, element)
+
         return True
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index 2438a9a..a4ec8ad 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -68,7 +68,7 @@ class FetchQueue(Queue):
 
         return QueueStatus.READY
 
-    def done(self, element, result, success):
+    def done(self, _, element, result, success):
 
         if not success:
             return False
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index 1fc4364..20e7843 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -53,13 +53,18 @@ class PullQueue(Queue):
         else:
             return QueueStatus.SKIP
 
-    def done(self, element, result, success):
+    def done(self, _, element, result, success):
 
         if not success:
             return False
 
         element._pull_done()
 
+        # Build jobs will check the "approximate" size first. Since we
+        # do not get an artifact size from pull jobs, we have to
+        # actually check the cache size.
+        self._scheduler._check_cache_size_real()
+
         # Element._pull() returns True if it downloaded an artifact,
         # here we want to appear skipped if we did not download.
         return result
diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
index aa5540e..77f2e02 100644
--- a/buildstream/_scheduler/queues/pushqueue.py
+++ b/buildstream/_scheduler/queues/pushqueue.py
@@ -42,7 +42,7 @@ class PushQueue(Queue):
 
         return QueueStatus.READY
 
-    def done(self, element, result, success):
+    def done(self, _, element, result, success):
 
         if not success:
             return False
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 39e431b..2ff10d8 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -145,6 +145,7 @@ class Queue():
     # Abstract method for handling a successful job completion.
     #
     # Args:
+    #    job (Job): The job which completed processing
     #    element (Element): The element which completed processing
     #    result (any): The return value of the process() implementation
     #    success (bool): True if the process() implementation did not
@@ -154,7 +155,7 @@ class Queue():
     #    (bool): True if the element should appear to be processsed,
     #            Otherwise False will count the element as "skipped"
     #
-    def done(self, element, result, success):
+    def done(self, job, element, result, success):
         pass
 
     #####################################################
@@ -224,6 +225,7 @@ class Queue():
     def process_ready(self):
         scheduler = self._scheduler
         unready = []
+        ready = []
 
         while self._wait_queue and scheduler.get_job_token(self.queue_type):
             element = self._wait_queue.popleft()
@@ -248,12 +250,14 @@ class Queue():
                              action_cb=self.process,
                              complete_cb=self._job_done,
                              max_retries=self._max_retries)
-            scheduler.job_starting(job, element)
+            ready.append(job)
 
         # These were not ready but were in the beginning, give em
         # first priority again next time around
         self._wait_queue.extendleft(unready)
 
+        return ready
+
     #####################################################
     #                 Private Methods                   #
     #####################################################
@@ -270,7 +274,7 @@ class Queue():
     def _update_workspaces(self, element, job):
         workspace_dict = None
         if job.child_data:
-            workspace_dict = job.child_data['workspace']
+            workspace_dict = job.child_data.get('workspace', None)
 
         # Handle any workspace modifications now
         #
@@ -298,17 +302,17 @@ class Queue():
     #
     def _job_done(self, job, element, success, result):
 
-        # Remove from our jobs
-        self.active_jobs.remove(job)
-
-        # Update workspaces in the main task before calling any queue implementation
+        # Update values that need to be synchronized in the main task
+        # before calling any queue implementation
         self._update_workspaces(element, job)
+        if job.child_data:
+            element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
 
         # Give the result of the job to the Queue implementor,
         # and determine if it should be considered as processed
         # or skipped.
         try:
-            processed = self.done(element, result, success)
+            processed = self.done(job, element, result, success)
 
         except BstError as e:
 
@@ -346,7 +350,6 @@ class Queue():
                 self.failed_elements.append(element)
 
         # Give the token for this job back to the scheduler
-        # immediately before invoking another round of scheduling
         self._scheduler.put_job_token(self.queue_type)
 
     # Convenience wrapper for Queue implementations to send
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index a371e52..df3f7b1 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -49,7 +49,7 @@ class TrackQueue(Queue):
 
         return QueueStatus.READY
 
-    def done(self, element, result, success):
+    def done(self, _, element, result, success):
 
         if not success:
             return False
diff --git a/buildstream/element.py b/buildstream/element.py
index fc21f80..98a39a4 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -225,6 +225,7 @@ class Element(Plugin):
         self.__staged_sources_directory = None  # Location where Element.stage_sources() was called
         self.__tainted = None                   # Whether the artifact is tainted and should not be shared
         self.__required = False                 # Whether the artifact is required in the current session
+        self.__artifact_size = None             # The size of data committed to the artifact cache
 
         # hash tables of loaded artifact metadata, hashed by key
         self.__metadata_keys = {}                     # Strong and weak keys for this key
@@ -1524,7 +1525,8 @@ class Element(Plugin):
                 }), os.path.join(metadir, 'workspaced-dependencies.yaml'))
 
                 with self.timed_activity("Caching artifact"):
-                    self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
+                    self.__artifacts._commit(self, assembledir, self.__get_cache_keys_for_commit())
+                    self.__artifact_size = utils._get_dir_size(assembledir)
 
             # Finally cleanup the build dir
             cleanup_rootdir()
@@ -1763,6 +1765,25 @@ class Element(Plugin):
         workspaces = self._get_context().get_workspaces()
         return workspaces.get_workspace(self._get_full_name())
 
+    # _get_artifact_size()
+    #
+    # Get the size of the artifact produced by this element in the
+    # current pipeline - if this element has not been assembled or
+    # pulled, this will be None.
+    #
+    # Note that this is the size of an artifact *before* committing it
+    # to the cache, the size on disk may differ. It can act as an
+    # approximate guide for when to do a proper size calculation.
+    #
+    # Returns:
+    #    (int|None): The size of the artifact
+    #
+    def _get_artifact_size(self):
+        return self.__artifact_size
+
+    def _get_artifact_cache(self):
+        return self.__artifacts
+
     # _write_script():
     #
     # Writes a script to the given directory.