You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 07:39:33 UTC

[buildstream] branch 135-expire-artifacts-in-local-cache-clean created (now a2d7758)

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a change to branch 135-expire-artifacts-in-local-cache-clean
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at a2d7758  WIP: Perform artifact cache expiry

This branch includes the following new commits:

     new 1cffee6  Bump required python version to 3.5
     new 47b6b6e  _exceptions.py: Add `detail` to ArtifactErrors
     new 70cd94a  buildstream/_scheduler/*queue.py: Move queues to a subdirectory
     new 7094526  Make Jobs abstract and element-independent
     new a2d7758  WIP: Perform artifact cache expiry

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 03/05: buildstream/_scheduler/*queue.py: Move queues to a subdirectory

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch 135-expire-artifacts-in-local-cache-clean
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 70cd94afa9a05ef0005cf85aa0db1d795b64bbd3
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Mon Jul 16 14:38:25 2018 +0100

    buildstream/_scheduler/*queue.py: Move queues to a subdirectory
---
 buildstream/_scheduler/__init__.py                | 12 ++++++------
 buildstream/_scheduler/queues/__init__.py         |  1 +
 buildstream/_scheduler/{ => queues}/buildqueue.py |  0
 buildstream/_scheduler/{ => queues}/fetchqueue.py |  2 +-
 buildstream/_scheduler/{ => queues}/pullqueue.py  |  0
 buildstream/_scheduler/{ => queues}/pushqueue.py  |  0
 buildstream/_scheduler/{ => queues}/queue.py      |  6 +++---
 buildstream/_scheduler/{ => queues}/trackqueue.py |  4 ++--
 buildstream/_scheduler/scheduler.py               |  2 +-
 9 files changed, 14 insertions(+), 13 deletions(-)

diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py
index 80523db..a53a133 100644
--- a/buildstream/_scheduler/__init__.py
+++ b/buildstream/_scheduler/__init__.py
@@ -17,12 +17,12 @@
 #  Authors:
 #        Tristan Van Berkom <tr...@codethink.co.uk>
 
-from .queue import Queue, QueueStatus, QueueType
+from .queues import Queue, QueueStatus, QueueType
 
-from .fetchqueue import FetchQueue
-from .trackqueue import TrackQueue
-from .buildqueue import BuildQueue
-from .pushqueue import PushQueue
-from .pullqueue import PullQueue
+from .queues.fetchqueue import FetchQueue
+from .queues.trackqueue import TrackQueue
+from .queues.buildqueue import BuildQueue
+from .queues.pushqueue import PushQueue
+from .queues.pullqueue import PullQueue
 
 from .scheduler import Scheduler, SchedStatus
diff --git a/buildstream/_scheduler/queues/__init__.py b/buildstream/_scheduler/queues/__init__.py
new file mode 100644
index 0000000..b9acef1
--- /dev/null
+++ b/buildstream/_scheduler/queues/__init__.py
@@ -0,0 +1 @@
+from .queue import Queue, QueueStatus, QueueType
diff --git a/buildstream/_scheduler/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
similarity index 100%
rename from buildstream/_scheduler/buildqueue.py
rename to buildstream/_scheduler/queues/buildqueue.py
diff --git a/buildstream/_scheduler/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
similarity index 98%
rename from buildstream/_scheduler/fetchqueue.py
rename to buildstream/_scheduler/queues/fetchqueue.py
index 24512bd..bdff156 100644
--- a/buildstream/_scheduler/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -19,7 +19,7 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 
 # BuildStream toplevel imports
-from .. import Consistency
+from ... import Consistency
 
 # Local imports
 from . import Queue, QueueStatus, QueueType
diff --git a/buildstream/_scheduler/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
similarity index 100%
rename from buildstream/_scheduler/pullqueue.py
rename to buildstream/_scheduler/queues/pullqueue.py
diff --git a/buildstream/_scheduler/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
similarity index 100%
rename from buildstream/_scheduler/pushqueue.py
rename to buildstream/_scheduler/queues/pushqueue.py
diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queues/queue.py
similarity index 98%
rename from buildstream/_scheduler/queue.py
rename to buildstream/_scheduler/queues/queue.py
index 15caf83..d0c4828 100644
--- a/buildstream/_scheduler/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -24,11 +24,11 @@ from enum import Enum
 import traceback
 
 # Local imports
-from .job import Job
+from ..job import Job
 
 # BuildStream toplevel imports
-from .._exceptions import BstError, set_last_task_error
-from .._message import Message, MessageType
+from ..._exceptions import BstError, set_last_task_error
+from ..._message import Message, MessageType
 
 
 # Indicates the kind of activity
diff --git a/buildstream/_scheduler/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
similarity index 97%
rename from buildstream/_scheduler/trackqueue.py
rename to buildstream/_scheduler/queues/trackqueue.py
index e48e1ae..3a65f01 100644
--- a/buildstream/_scheduler/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -19,8 +19,8 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 
 # BuildStream toplevel imports
-from ..plugin import _plugin_lookup
-from .. import SourceError
+from ...plugin import _plugin_lookup
+from ... import SourceError
 
 # Local imports
 from . import Queue, QueueStatus, QueueType
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index f8a66ae..7bfbc95 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -26,7 +26,7 @@ import datetime
 from contextlib import contextmanager
 
 # Local imports
-from .queue import QueueType
+from .queues import QueueType
 
 
 # A decent return code for Scheduler.run()


[buildstream] 04/05: Make Jobs abstract and element-independent

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch 135-expire-artifacts-in-local-cache-clean
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 7094526b28db94dd8a9e190610b73a228effb7b6
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Mon Jul 16 15:31:55 2018 +0100

    Make Jobs abstract and element-independent
---
 buildstream/_frontend/app.py                |  35 +--
 buildstream/_frontend/status.py             |  27 ++-
 buildstream/_scheduler/__init__.py          |   2 +-
 buildstream/_scheduler/jobs/__init__.py     |   1 +
 buildstream/_scheduler/jobs/elementjob.py   | 217 +++++++++++++++++
 buildstream/_scheduler/{ => jobs}/job.py    | 351 ++++++++++++++++------------
 buildstream/_scheduler/queues/__init__.py   |   2 +-
 buildstream/_scheduler/queues/buildqueue.py |   7 +-
 buildstream/_scheduler/queues/fetchqueue.py |   7 +-
 buildstream/_scheduler/queues/pullqueue.py  |   7 +-
 buildstream/_scheduler/queues/pushqueue.py  |   7 +-
 buildstream/_scheduler/queues/queue.py      | 136 ++++++-----
 buildstream/_scheduler/queues/trackqueue.py |   7 +-
 buildstream/_scheduler/resources.py         | 105 +++++++++
 buildstream/_scheduler/scheduler.py         | 213 ++++++++---------
 15 files changed, 758 insertions(+), 366 deletions(-)

diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 4675b0e..de910af 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -492,30 +492,37 @@ class App():
     def _tick(self, elapsed):
         self._maybe_render_status()
 
-    def _job_started(self, element, action_name):
-        self._status.add_job(element, action_name)
+    def _job_started(self, job):
+        self._status.add_job(job)
         self._maybe_render_status()
 
-    def _job_completed(self, element, queue, action_name, success):
-        self._status.remove_job(element, action_name)
+    def _job_completed(self, job, success):
+        self._status.remove_job(job)
         self._maybe_render_status()
 
         # Dont attempt to handle a failure if the user has already opted to
         # terminate
         if not success and not self.stream.terminated:
 
-            # Get the last failure message for additional context
-            failure = self._fail_messages.get(element._get_unique_id())
+            if hasattr(job, 'element'):
+                element = job.element
+                queue = job.queue
 
-            # XXX This is dangerous, sometimes we get the job completed *before*
-            # the failure message reaches us ??
-            if not failure:
-                self._status.clear()
-                click.echo("\n\n\nBUG: Message handling out of sync, " +
-                           "unable to retrieve failure message for element {}\n\n\n\n\n"
-                           .format(element), err=True)
+                # Get the last failure message for additional context
+                failure = self._fail_messages.get(element._get_unique_id())
+
+                # XXX This is dangerous, sometimes we get the job completed *before*
+                # the failure message reaches us ??
+                if not failure:
+                    self._status.clear()
+                    click.echo("\n\n\nBUG: Message handling out of sync, " +
+                               "unable to retrieve failure message for element {}\n\n\n\n\n"
+                               .format(element), err=True)
+                else:
+                    self._handle_failure(element, queue, failure)
             else:
-                self._handle_failure(element, queue, failure)
+                click.echo("\nTerminating all jobs\n", err=True)
+                self.stream.terminate()
 
     def _handle_failure(self, element, queue, failure):
 
diff --git a/buildstream/_frontend/status.py b/buildstream/_frontend/status.py
index 3f66e00..7a2e719 100644
--- a/buildstream/_frontend/status.py
+++ b/buildstream/_frontend/status.py
@@ -77,9 +77,9 @@ class Status():
     #    element (Element): The element of the job to track
     #    action_name (str): The action name for this job
     #
-    def add_job(self, element, action_name):
+    def add_job(self, job):
         elapsed = self._stream.elapsed_time
-        job = _StatusJob(self._context, element, action_name, self._content_profile, self._format_profile, elapsed)
+        job = _StatusJob(self._context, job, self._content_profile, self._format_profile, elapsed)
         self._jobs.append(job)
         self._need_alloc = True
 
@@ -91,7 +91,13 @@ class Status():
     #    element (Element): The element of the job to track
     #    action_name (str): The action name for this job
     #
-    def remove_job(self, element, action_name):
+    def remove_job(self, job):
+        action_name = job.action_name
+        if not hasattr(job, 'element'):
+            element = None
+        else:
+            element = job.element
+
         self._jobs = [
             job for job in self._jobs
             if not (job.element is element and
@@ -358,15 +364,19 @@ class _StatusHeader():
 #
 # Args:
 #    context (Context): The Context
-#    element (Element): The element being processed
-#    action_name (str): The name of the action
+#    job (Job): The job being processed
 #    content_profile (Profile): Formatting profile for content text
 #    format_profile (Profile): Formatting profile for formatting text
 #    elapsed (datetime): The offset into the session when this job is created
 #
 class _StatusJob():
 
-    def __init__(self, context, element, action_name, content_profile, format_profile, elapsed):
+    def __init__(self, context, job, content_profile, format_profile, elapsed):
+        action_name = job.action_name
+        if not hasattr(job, 'element'):
+            element = None
+        else:
+            element = job.element
 
         #
         # Public members
@@ -374,6 +384,7 @@ class _StatusJob():
         self.element = element            # The Element
         self.action_name = action_name    # The action name
         self.size = None                  # The number of characters required to render
+        self.full_name = element._get_full_name() if element else action_name
 
         #
         # Private members
@@ -386,7 +397,7 @@ class _StatusJob():
         # Calculate the size needed to display
         self.size = 10  # Size of time code with brackets
         self.size += len(action_name)
-        self.size += len(element._get_full_name())
+        self.size += len(self.full_name)
         self.size += 3  # '[' + ':' + ']'
 
     # render()
@@ -403,7 +414,7 @@ class _StatusJob():
             self._format_profile.fmt(']')
 
         # Add padding after the display name, before terminating ']'
-        name = self.element._get_full_name() + (' ' * padding)
+        name = self.full_name + (' ' * padding)
         text += self._format_profile.fmt('[') + \
             self._content_profile.fmt(self.action_name) + \
             self._format_profile.fmt(':') + \
diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py
index a53a133..8e1140f 100644
--- a/buildstream/_scheduler/__init__.py
+++ b/buildstream/_scheduler/__init__.py
@@ -17,7 +17,7 @@
 #  Authors:
 #        Tristan Van Berkom <tr...@codethink.co.uk>
 
-from .queues import Queue, QueueStatus, QueueType
+from .queues import Queue, QueueStatus
 
 from .queues.fetchqueue import FetchQueue
 from .queues.trackqueue import TrackQueue
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
new file mode 100644
index 0000000..0030f5c
--- /dev/null
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -0,0 +1 @@
+from .elementjob import ElementJob
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
new file mode 100644
index 0000000..68f4e04
--- /dev/null
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -0,0 +1,217 @@
+#  Copyright (C) 2018 Codethink Limited
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Author:
+#        Tristan Daniël Maat <tr...@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from ruamel import yaml
+
+from ..._message import Message, MessageType
+from ...plugin import _plugin_lookup
+from ... import _signals
+
+from .job import Job
+
+
+# ElementJob()
+#
+# A job to run an element's commands. When this job is spawned
+# `action_cb` will be called, and when it completes `complete_cb` will
+# be called.
+#
+# Args:
+#    scheduler (Scheduler): The scheduler
+#    action_name (str): The queue action name
+#    max_retries (int): The maximum number of retries
+#    action_cb (callable): The function to execute on the child
+#    complete_cb (callable): The function to execute when the job completes
+#    element (Element): The element to work on
+#    kwargs: Remaining Job() constructor arguments
+#
+# Here is the calling signature of the action_cb:
+#
+#     action_cb():
+#
+#     This function will be called in the child task
+#
+#     Args:
+#        element (Element): The element passed to the Job() constructor
+#
+#     Returns:
+#        (object): Any abstract simple python object, including a string, int,
+#                  bool, list or dict, this must be a simple serializable object.
+#
+# Here is the calling signature of the complete_cb:
+#
+#     complete_cb():
+#
+#     This function will be called when the child task completes
+#
+#     Args:
+#        job (Job): The job object which completed
+#        element (Element): The element passed to the Job() constructor
+#        success (bool): True if the action_cb did not raise an exception
+#        result (object): The deserialized object returned by the `action_cb`, or None
+#                         if `success` is False
+#
+class ElementJob(Job):
+    def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.queue = queue
+        self._element = element
+        self._action_cb = action_cb            # The action callable function
+        self._complete_cb = complete_cb        # The complete callable function
+
+    @property
+    def element(self):
+        return self._element
+
+    # _child_process()
+    #
+    # This will be executed after fork(), and is intended to perform
+    # the job's task.
+    #
+    # Returns:
+    #    (any): A (simple!) object to be returned to the main thread
+    #           as the result.
+    #
+    def _child_process(self):
+        return self._action_cb(self._element)
+
+    def _parent_complete(self, success, result):
+        self._complete_cb(self, self._element, success, self._result)
+
+    # _child_logging_enabled()
+    #
+    # Start the log for this job. This function will be given a
+    # template string for the path to a log file - this will contain
+    # "{pid}", which should be replaced with the current process'
+    # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
+    #
+    # Args:
+    #    logfile (str): A template string that points to the logfile
+    #                   that should be used - replace {pid} first.
+    #
+    # Yields:
+    #    (str) The path to the logfile with {pid} replaced.
+    #
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        self._logfile = logfile.format(pid=os.getpid())
+
+        with open(self._logfile, 'a') as log:
+            # Write one last line to the log and flush it to disk
+            def flush_log():
+
+                # If the process currently had something happening in the I/O stack
+                # then trying to reenter the I/O stack will fire a runtime error.
+                #
+                # So just try to flush as well as we can at SIGTERM time
+                try:
+                    # FIXME: Better logging
+
+                    log.write('\n\nAction {} for element {} forcefully terminated\n'
+                              .format(self.action_name, self._element.name))
+                    log.flush()
+                except RuntimeError:
+                    os.fsync(log.fileno())
+
+            self._element._set_log_handle(log)
+            with _signals.terminator(flush_log):
+                self._print_start_message(self._element, self._logfile)
+                yield self._logfile
+            self._element._set_log_handle(None)
+            self._logfile = None
+
+    # _message():
+    #
+    # Sends a message to the frontend
+    #
+    # Args:
+    #    message_type (MessageType): The type of message to send
+    #    message (str): The message
+    #    kwargs: Remaining Message() constructor arguments
+    #
+    def _message(self, message_type, message, **kwargs):
+        args = dict(kwargs)
+        args['scheduler'] = True
+        self._scheduler.context.message(
+            Message(self._element._get_unique_id(),
+                    message_type,
+                    message,
+                    **args))
+
+    def _print_start_message(self, element, logfile):
+        self._message(MessageType.START, self.action_name, logfile=logfile)
+
+        # Print the element's environment at the beginning of any element's log file.
+        #
+        # This should probably be omitted for non-build tasks but it's harmless here
+        elt_env = element.get_environment()
+        env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
+        self._message(MessageType.LOG,
+                      "Build environment for element {}".format(element.name),
+                      detail=env_dump, logfile=logfile)
+
+    # _child_log()
+    #
+    # Log a message returned by the frontend's main message handler
+    # and return it to the main process.
+    #
+    # Arguments:
+    #     message (str): The message to log
+    #
+    # Returns:
+    #     message (Message): A message object
+    #
+    def _child_log(self, message):
+        # Tag them on the way out the door...
+        message.action_name = self.action_name
+        message.task_id = self._element._get_unique_id()
+
+        # Use the plugin for the task for the output, not a plugin
+        # which might be acting on behalf of the task
+        plugin = _plugin_lookup(message.task_id)
+
+        with plugin._output_file() as output:
+            message_text = self._format_frontend_message(message, '[{}]'.format(plugin.name))
+            output.write('{}\n'.format(message_text))
+            output.flush()
+
+        return message
+
+    # _child_process_data()
+    #
+    # Abstract method to retrieve additional data that should be
+    # returned to the parent process. Note that the job result is
+    # retrieved independently.
+    #
+    # Values can later be retrieved in Job.child_data.
+    #
+    # Returns:
+    #    (dict) A dict containing values later to be read by _process_sync_data
+    #
+    def _child_process_data(self):
+        data = {}
+
+        workspace = self._element._get_workspace()
+
+        if workspace is not None:
+            data['workspace'] = workspace.to_dict()
+
+        return data
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/jobs/job.py
similarity index 65%
rename from buildstream/_scheduler/job.py
rename to buildstream/_scheduler/jobs/job.py
index 8b9af93..580f9ff 100644
--- a/buildstream/_scheduler/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -26,20 +26,21 @@ import datetime
 import traceback
 import asyncio
 import multiprocessing
-from ruamel import yaml
+from contextlib import contextmanager
+
+import psutil
 
 # BuildStream toplevel imports
-from .._exceptions import BstError, set_last_task_error
-from .._message import Message, MessageType, unconditional_messages
-from ..plugin import _plugin_lookup
-from .. import _signals, utils
+from ..._exceptions import ImplError, BstError, set_last_task_error
+from ..._message import MessageType, unconditional_messages
+from ... import _signals, utils
 
 
 # Used to distinguish between status messages and return values
 class Envelope():
     def __init__(self, message_type, message):
-        self.message_type = message_type
-        self.message = message
+        self._message_type = message_type
+        self._message = message
 
 
 # Process class that doesn't call waitpid on its own.
@@ -54,54 +55,50 @@ class Process(multiprocessing.Process):
 # Job()
 #
 # The Job object represents a parallel task, when calling Job.spawn(),
-# the given `action_cb` will be called in parallel to the calling process,
-# and `complete_cb` will be called with the action result in the calling
-# process when the job completes.
+# the given `Job._child_process` will be called in parallel to the
+# calling process, and `Job._parent_complete` will be called with the
+# action result in the calling process when the job completes.
 #
 # Args:
 #    scheduler (Scheduler): The scheduler
-#    element (Element): The element to operate on
 #    action_name (str): The queue action name
-#    action_cb (callable): The action function
-#    complete_cb (callable): The function to call when complete
+#    logfile (str): A template string that points to the logfile
+#                   that should be used - should contain {pid}.
+#    resources (iter(ResourceType)) - A set of resources this job
+#                                     wants to use.
+#    exclusive_resources (iter(ResourceType)) - A set of resources
+#                                               this job wants to use
+#                                               exclusively.
 #    max_retries (int): The maximum number of retries
 #
-# Here is the calling signature of the action_cb:
-#
-#     action_cb():
-#
-#     This function will be called in the child task
-#
-#     Args:
-#        element (Element): The element passed to the Job() constructor
-#
-#     Returns:
-#        (object): Any abstract simple python object, including a string, int,
-#                  bool, list or dict, this must be a simple serializable object.
-#
-# Here is the calling signature of the complete_cb:
-#
-#     complete_cb():
-#
-#     This function will be called when the child task completes
-#
-#     Args:
-#        job (Job): The job object which completed
-#        element (Element): The element passed to the Job() constructor
-#        success (bool): True if the action_cb did not raise an exception
-#        result (object): The deserialized object returned by the `action_cb`, or None
-#                         if `success` is False
-#
 class Job():
 
-    def __init__(self, scheduler, element, action_name, action_cb, complete_cb, *, max_retries=0):
+    def __init__(self, scheduler, action_name, logfile, *,
+                 resources=None, exclusive_resources=None, max_retries=0):
+
+        if resources is None:
+            resources = set()
+        else:
+            resources = set(resources)
+        if exclusive_resources is None:
+            exclusive_resources = set()
+        else:
+            exclusive_resources = set(resources)
+
+        # Ensure nobody tries not use an exclusive resource.
+        assert exclusive_resources <= resources, "All exclusive resources must also be resources!"
 
         #
         # Public members
         #
-        self.element = element           # The element we're processing
         self.action_name = action_name   # The action name for the Queue
-        self.workspace_dict = None       # A serialized Workspace object, after any modifications
+        self.child_data = None           # Data to be sent to the main process
+
+        # The resources this job wants to access
+        self.resources = resources
+        # Resources this job needs to access exclusively, i.e., no
+        # other job should be allowed to access them
+        self.exclusive_resources = exclusive_resources
 
         #
         # Private members
@@ -110,13 +107,12 @@ class Job():
         self._queue = multiprocessing.Queue()  # A message passing queue
         self._process = None                   # The Process object
         self._watcher = None                   # Child process watcher
-        self._action_cb = action_cb            # The action callable function
-        self._complete_cb = complete_cb        # The complete callable function
         self._listening = False                # Whether the parent is currently listening
         self._suspended = False                # Whether this job is currently suspended
         self._max_retries = max_retries        # Maximum number of automatic retries
         self._result = None                    # Return value of child action in the parent
         self._tries = 0                        # Try count, for retryable jobs
+        self._logfile = logfile
 
     # spawn()
     #
@@ -152,8 +148,7 @@ class Job():
         # First resume the job if it's suspended
         self.resume(silent=True)
 
-        self._message(self.element, MessageType.STATUS,
-                      "{} terminating".format(self.action_name))
+        self._message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
         # Make sure there is no garbage on the queue
         self._parent_stop_listening()
@@ -184,9 +179,15 @@ class Job():
     def kill(self):
 
         # Force kill
-        self._message(self.element, MessageType.WARN,
+        self._message(MessageType.WARN,
                       "{} did not terminate gracefully, killing".format(self.action_name))
-        utils._kill_process_tree(self._process.pid)
+
+        try:
+            utils._kill_process_tree(self._process.pid)
+        # This can happen if the process died of its own accord before
+        # we try to kill it
+        except psutil.NoSuchProcess:
+            return
 
     # suspend()
     #
@@ -194,7 +195,7 @@ class Job():
     #
     def suspend(self):
         if not self._suspended:
-            self._message(self.element, MessageType.STATUS,
+            self._message(MessageType.STATUS,
                           "{} suspending".format(self.action_name))
 
             try:
@@ -219,42 +220,152 @@ class Job():
     def resume(self, silent=False):
         if self._suspended:
             if not silent:
-                self._message(self.element, MessageType.STATUS,
+                self._message(MessageType.STATUS,
                               "{} resuming".format(self.action_name))
 
             os.kill(self._process.pid, signal.SIGCONT)
             self._suspended = False
 
     #######################################################
-    #                  Local Private Methods              #
+    #                  Abstract Methods                   #
     #######################################################
+    # _parent_complete()
     #
-    # Methods prefixed with the word 'child' take place in the child process
+    # This will be executed after the job finishes, and is expected to
+    # pass the result to the main thread.
     #
-    # Methods prefixed with the word 'parent' take place in the parent process
+    # Args:
+    #    success (bool): Whether the job was successful.
+    #    result (any): The result returned by _child_process().
     #
-    # Other methods can be called in both child or parent processes
+    def _parent_complete(self, success, result):
+        raise ImplError("Job '{kind}' does not implement _parent_complete()"
+                        .format(kind=type(self).__name__))
+
+    # _child_process()
     #
-    #######################################################
+    # This will be executed after fork(), and is intended to perform
+    # the job's task.
+    #
+    # Returns:
+    #    (any): A (simple!) object to be returned to the main thread
+    #           as the result.
+    #
+    def _child_process(self):
+        raise ImplError("Job '{kind}' does not implement _child_process()"
+                        .format(kind=type(self).__name__))
+
+    # _child_logging_enabled()
+    #
+    # Start the log for this job. This function will be given a
+    # template string for the path to a log file - this will contain
+    # "{pid}", which should be replaced with the current process'
+    # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
+    #
+    # Args:
+    #    logfile (str): A template string that points to the logfile
+    #                   that should be used - replace {pid} first.
+    #
+    # Yields:
+    #    (str) The path to the logfile with {pid} replaced.
+    #
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        raise ImplError("Job '{kind}' does not implement _child_logging_enabled()"
+                        .format(kind=type(self).__name__))
 
     # _message():
     #
     # Sends a message to the frontend
     #
     # Args:
-    #    plugin (Plugin): The plugin to send a message for
     #    message_type (MessageType): The type of message to send
     #    message (str): The message
     #    kwargs: Remaining Message() constructor arguments
     #
-    def _message(self, plugin, message_type, message, **kwargs):
-        args = dict(kwargs)
-        args['scheduler'] = True
-        self._scheduler.context.message(
-            Message(plugin._get_unique_id(),
-                    message_type,
-                    message,
-                    **args))
+    def _message(self, message_type, message, **kwargs):
+        raise ImplError("Job '{kind}' does not implement _message()"
+                        .format(kind=type(self).__name__))
+
+    # _child_process_data()
+    #
+    # Abstract method to retrieve additional data that should be
+    # returned to the parent process. Note that the job result is
+    # retrieved independently.
+    #
+    # Values can later be retrieved in Job.child_data.
+    #
+    # Returns:
+    #    (dict) A dict containing values later to be read by _process_sync_data
+    #
+    def _child_process_data(self):
+        return {}
+
+    # _child_log()
+    #
+    # Log a message returned by the frontend's main message handler
+    # and return it to the main process.
+    #
+    # This method is also expected to add process-specific information
+    # to the message (notably, action_name and task_id).
+    #
+    # Arguments:
+    #     message (str): The message to log
+    #
+    # Returns:
+    #     message (Message): A message object
+    #
+    def _child_log(self, message):
+        raise ImplError("Job '{kind}' does not implement _child_log()"
+                        .format(kind=type(self).__name__))
+
+    #######################################################
+    #                  Local Private Methods              #
+    #######################################################
+    #
+    # Methods prefixed with the word 'child' take place in the child process
+    #
+    # Methods prefixed with the word 'parent' take place in the parent process
+    #
+    # Other methods can be called in both child or parent processes
+    #
+    #######################################################
+
+    # _format_frontend_message()
+    #
+    # Format a message from the frontend for logging purposes. This
+    # will prepend a time code and add other information to help
+    # determine what happened.
+    #
+    # Args:
+    #    message (Message) - The message to create a text from.
+    #    name (str) - A name for the executing context.
+    #
+    # Returns:
+    #    (str) The text to log.
+    #
+    def _format_frontend_message(self, message, name):
+        INDENT = "    "
+        EMPTYTIME = "--:--:--"
+        template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
+
+        detail = ''
+        if message.detail is not None:
+            template += "\n\n{detail}"
+            detail = message.detail.rstrip('\n')
+            detail = INDENT + INDENT.join(detail.splitlines(True))
+
+        timecode = EMPTYTIME
+        if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+            hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+            minutes, seconds = divmod(remainder, 60)
+            timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+        return template.format(timecode=timecode,
+                               type=message.message_type.upper(),
+                               name=name,
+                               message=message.message,
+                               detail=detail)
 
     # _child_action()
     #
@@ -265,7 +376,7 @@ class Job():
     #
     def _child_action(self, queue):
 
-        element = self.element
+        logfile = self._logfile
 
         # This avoids some SIGTSTP signals from grandchildren
         # getting propagated up to the master process
@@ -301,35 +412,24 @@ class Job():
         # Time, log and and run the action function
         #
         with _signals.suspendable(stop_time, resume_time), \
-            element._logging_enabled(self.action_name) as filename:
-
-            self._message(element, MessageType.START, self.action_name, logfile=filename)
-
-            # Print the element's environment at the beginning of any element's log file.
-            #
-            # This should probably be omitted for non-build tasks but it's harmless here
-            elt_env = element.get_environment()
-            env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
-            self._message(element, MessageType.LOG,
-                          "Build environment for element {}".format(element.name),
-                          detail=env_dump, logfile=filename)
+            self._child_logging_enabled(logfile) as filename:
 
             try:
                 # Try the task action
-                result = self._action_cb(element)
+                result = self._child_process()
             except BstError as e:
                 elapsed = datetime.datetime.now() - starttime
 
                 if self._tries <= self._max_retries:
-                    self._message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self._tries),
+                    self._message(MessageType.FAIL,
+                                  "Try #{} failed, retrying".format(self._tries),
                                   elapsed=elapsed)
                 else:
-                    self._message(element, MessageType.FAIL, str(e),
+                    self._message(MessageType.FAIL, str(e),
                                   elapsed=elapsed, detail=e.detail,
                                   logfile=filename, sandbox=e.sandbox)
 
-                # Report changes in the workspace, even if there was a handled failure
-                self._child_send_workspace()
+                self._queue.put(Envelope('child_data', self._child_process_data()))
 
                 # Report the exception to the parent (for internal testing purposes)
                 self._child_send_error(e)
@@ -343,18 +443,19 @@ class Job():
                 #
                 elapsed = datetime.datetime.now() - starttime
                 detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
-                self._message(element, MessageType.BUG, self.action_name,
+
+                self._message(MessageType.BUG, self.action_name,
                               elapsed=elapsed, detail=detail,
                               logfile=filename)
                 self._child_shutdown(1)
 
             else:
                 # No exception occurred in the action
-                self._child_send_workspace()
+                self._queue.put(Envelope('child_data', self._child_process_data()))
                 self._child_send_result(result)
 
                 elapsed = datetime.datetime.now() - starttime
-                self._message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed,
+                self._message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
                               logfile=filename)
 
                 # Shutdown needs to stay outside of the above context manager,
@@ -398,16 +499,6 @@ class Job():
             envelope = Envelope('result', result)
             self._queue.put(envelope)
 
-    # _child_send_workspace()
-    #
-    # Sends the serialized workspace through the message queue, if any
-    #
-    def _child_send_workspace(self):
-        workspace = self.element._get_workspace()
-        if workspace:
-            envelope = Envelope('workspace', workspace.to_dict())
-            self._queue.put(envelope)
-
     # _child_shutdown()
     #
     # Shuts down the child process by cleaning up and exiting the process
@@ -419,44 +510,6 @@ class Job():
         self._queue.close()
         sys.exit(exit_code)
 
-    # _child_log()
-    #
-    # Logs a Message to the process's dedicated log file
-    #
-    # Args:
-    #    plugin (Plugin): The plugin to log for
-    #    message (Message): The message to log
-    #
-    def _child_log(self, plugin, message):
-
-        with plugin._output_file() as output:
-            INDENT = "    "
-            EMPTYTIME = "--:--:--"
-
-            name = '[' + plugin.name + ']'
-
-            fmt = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
-            detail = ''
-            if message.detail is not None:
-                fmt += "\n\n{detail}"
-                detail = message.detail.rstrip('\n')
-                detail = INDENT + INDENT.join(detail.splitlines(True))
-
-            timecode = EMPTYTIME
-            if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
-                hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 * 60)
-                minutes, seconds = divmod(remainder, 60)
-                timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
-            message_text = fmt.format(timecode=timecode,
-                                      type=message.message_type.upper(),
-                                      name=name,
-                                      message=message.message,
-                                      detail=detail)
-
-            output.write('{}\n'.format(message_text))
-            output.flush()
-
     # _child_message_handler()
     #
     # A Context delegate for handling messages, this replaces the
@@ -470,16 +523,8 @@ class Job():
     #
     def _child_message_handler(self, message, context):
 
-        # Tag them on the way out the door...
-        message.action_name = self.action_name
-        message.task_id = self.element._get_unique_id()
-
-        # Use the plugin for the task for the output, not a plugin
-        # which might be acting on behalf of the task
-        plugin = _plugin_lookup(message.task_id)
-
         # Log first
-        self._child_log(plugin, message)
+        message = self._child_log(message)
 
         if message.message_type == MessageType.FAIL and self._tries <= self._max_retries:
             # Job will be retried, display failures as warnings in the frontend
@@ -519,7 +564,8 @@ class Job():
             self.spawn()
             return
 
-        self._complete_cb(self, self.element, returncode == 0, self._result)
+        self._parent_complete(returncode == 0, self._result)
+        self._scheduler.job_completed(self, returncode == 0)
 
     # _parent_process_envelope()
     #
@@ -536,21 +582,22 @@ class Job():
         if not self._listening:
             return
 
-        if envelope.message_type == 'message':
+        if envelope._message_type == 'message':
             # Propagate received messages from children
             # back through the context.
-            self._scheduler.context.message(envelope.message)
-        elif envelope.message_type == 'error':
+            self._scheduler.context.message(envelope._message)
+        elif envelope._message_type == 'error':
             # For regression tests only, save the last error domain / reason
             # reported from a child task in the main process, this global state
             # is currently managed in _exceptions.py
-            set_last_task_error(envelope.message['domain'],
-                                envelope.message['reason'])
-        elif envelope.message_type == 'result':
+            set_last_task_error(envelope._message['domain'],
+                                envelope._message['reason'])
+        elif envelope._message_type == 'result':
             assert self._result is None
-            self._result = envelope.message
-        elif envelope.message_type == 'workspace':
-            self.workspace_dict = envelope.message
+            self._result = envelope._message
+        elif envelope._message_type == 'child_data':
+            # If we retry a job, we assign a new value to this
+            self.child_data = envelope._message
         else:
             raise Exception()
 
diff --git a/buildstream/_scheduler/queues/__init__.py b/buildstream/_scheduler/queues/__init__.py
index b9acef1..3b22939 100644
--- a/buildstream/_scheduler/queues/__init__.py
+++ b/buildstream/_scheduler/queues/__init__.py
@@ -1 +1 @@
-from .queue import Queue, QueueStatus, QueueType
+from .queue import Queue, QueueStatus
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 50ba312..7f8ac9e 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -18,7 +18,8 @@
 #        Tristan Van Berkom <tr...@codethink.co.uk>
 #        Jürg Billeter <ju...@codethink.co.uk>
 
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
 
 
 # A queue which assembles elements
@@ -27,7 +28,7 @@ class BuildQueue(Queue):
 
     action_name = "Build"
     complete_name = "Built"
-    queue_type = QueueType.BUILD
+    resources = [ResourceType.PROCESS]
 
     def process(self, element):
         element._assemble()
@@ -50,7 +51,7 @@ class BuildQueue(Queue):
 
         return QueueStatus.READY
 
-    def done(self, element, result, success):
+    def done(self, job, element, result, success):
 
         if success:
             # Inform element in main process that assembly is done
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index bdff156..265890b 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -22,7 +22,8 @@
 from ... import Consistency
 
 # Local imports
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
 
 
 # A queue which fetches element sources
@@ -31,7 +32,7 @@ class FetchQueue(Queue):
 
     action_name = "Fetch"
     complete_name = "Fetched"
-    queue_type = QueueType.FETCH
+    resources = [ResourceType.DOWNLOAD]
 
     def __init__(self, scheduler, skip_cached=False):
         super().__init__(scheduler)
@@ -66,7 +67,7 @@ class FetchQueue(Queue):
 
         return QueueStatus.READY
 
-    def done(self, element, result, success):
+    def done(self, _, element, result, success):
 
         if not success:
             return False
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index b4f5b0d..efaa59e 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -19,7 +19,8 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 
 # Local imports
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
 
 
 # A queue which pulls element artifacts
@@ -28,7 +29,7 @@ class PullQueue(Queue):
 
     action_name = "Pull"
     complete_name = "Pulled"
-    queue_type = QueueType.FETCH
+    resources = [ResourceType.UPLOAD]
 
     def process(self, element):
         # returns whether an artifact was downloaded or not
@@ -51,7 +52,7 @@ class PullQueue(Queue):
         else:
             return QueueStatus.SKIP
 
-    def done(self, element, result, success):
+    def done(self, _, element, result, success):
 
         if not success:
             return False
diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
index 624eefd..568e053 100644
--- a/buildstream/_scheduler/queues/pushqueue.py
+++ b/buildstream/_scheduler/queues/pushqueue.py
@@ -19,7 +19,8 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 
 # Local imports
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
 
 
 # A queue which pushes element artifacts
@@ -28,7 +29,7 @@ class PushQueue(Queue):
 
     action_name = "Push"
     complete_name = "Pushed"
-    queue_type = QueueType.PUSH
+    resources = [ResourceType.UPLOAD]
 
     def process(self, element):
         # returns whether an artifact was uploaded or not
@@ -40,7 +41,7 @@ class PushQueue(Queue):
 
         return QueueStatus.READY
 
-    def done(self, element, result, success):
+    def done(self, _, element, result, success):
 
         if not success:
             return False
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index d0c4828..8ca3ac0 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -19,32 +19,20 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 
 # System imports
+import os
 from collections import deque
 from enum import Enum
 import traceback
 
 # Local imports
-from ..job import Job
+from ..jobs import ElementJob
+from ..resources import ResourceType
 
 # BuildStream toplevel imports
 from ..._exceptions import BstError, set_last_task_error
 from ..._message import Message, MessageType
 
 
-# Indicates the kind of activity
-#
-#
-class QueueType():
-    # Tasks which download stuff from the internet
-    FETCH = 1
-
-    # CPU/Disk intensive tasks
-    BUILD = 2
-
-    # Tasks which upload stuff to the internet
-    PUSH = 3
-
-
 # Queue status for a given element
 #
 #
@@ -69,14 +57,13 @@ class Queue():
     # These should be overridden on class data of of concrete Queue implementations
     action_name = None
     complete_name = None
-    queue_type = None
+    resources = []                     # Resources this queues' jobs want
 
     def __init__(self, scheduler):
 
         #
         # Public members
         #
-        self.active_jobs = []          # List of active ongoing Jobs, for scheduler observation
         self.failed_elements = []      # List of failed elements, for the frontend
         self.processed_elements = []   # List of processed elements, for the frontend
         self.skipped_elements = []     # List of skipped elements, for the frontend
@@ -88,13 +75,13 @@ class Queue():
         self._wait_queue = deque()
         self._done_queue = deque()
         self._max_retries = 0
-        if self.queue_type == QueueType.FETCH or self.queue_type == QueueType.PUSH:
-            self._max_retries = scheduler.context.sched_network_retries
 
         # Assert the subclass has setup class data
         assert self.action_name is not None
         assert self.complete_name is not None
-        assert self.queue_type is not None
+
+        if ResourceType.UPLOAD in self.resources or ResourceType.DOWNLOAD in self.resources:
+            self._max_retries = scheduler.context.sched_network_retries
 
     #####################################################
     #     Abstract Methods for Queue implementations    #
@@ -143,6 +130,7 @@ class Queue():
     # Abstract method for handling a successful job completion.
     #
     # Args:
+    #    job (Job): The job which completed processing
     #    element (Element): The element which completed processing
     #    result (any): The return value of the process() implementation
     #    success (bool): True if the process() implementation did not
@@ -152,7 +140,7 @@ class Queue():
     #    (bool): True if the element should appear to be processsed,
     #            Otherwise False will count the element as "skipped"
     #
-    def done(self, element, result, success):
+    def done(self, job, element, result, success):
         pass
 
     #####################################################
@@ -170,10 +158,22 @@ class Queue():
         if not elts:
             return
 
+        # Note: The internal lists work with jobs. This is not
+        #       reflected in any external methods (except
+        #       pop/peek_ready_jobs).
+        def create_job(element):
+            logfile = self._element_log_path(element)
+            return ElementJob(self._scheduler, self.action_name,
+                              logfile, element=element, queue=self,
+                              resources=self.resources,
+                              action_cb=self.process,
+                              complete_cb=self._job_done,
+                              max_retries=self._max_retries)
+
         # Place skipped elements directly on the done queue
-        elts = list(elts)
-        skip = [elt for elt in elts if self.status(elt) == QueueStatus.SKIP]
-        wait = [elt for elt in elts if elt not in skip]
+        jobs = [create_job(elt) for elt in elts]
+        skip = [job for job in jobs if self.status(job.element) == QueueStatus.SKIP]
+        wait = [job for job in jobs if job not in skip]
 
         self._wait_queue.extend(wait)
         self._done_queue.extend(skip)
@@ -189,7 +189,7 @@ class Queue():
     #
     def dequeue(self):
         while self._done_queue:
-            yield self._done_queue.popleft()
+            yield self._done_queue.popleft().element
 
     # dequeue_ready()
     #
@@ -201,7 +201,10 @@ class Queue():
     def dequeue_ready(self):
         return any(self._done_queue)
 
-    # process_ready()
+    # pop_ready_jobs()
+    #
+    # Returns:
+    #     ([Job]): A list of jobs to run
     #
     # Process elements in the queue, moving elements which were enqueued
     # into the dequeue pool, and processing them if necessary.
@@ -211,46 +214,45 @@ class Queue():
     #
     #   o Elements which are QueueStatus.WAIT will not be effected
     #
-    #   o Elements which are QueueStatus.READY will be processed
-    #     and added to the Queue.active_jobs list as a result,
-    #     given that the scheduler allows the Queue enough tokens
-    #     for the given queue's job type
-    #
     #   o Elements which are QueueStatus.SKIP will move directly
     #     to the dequeue pool
     #
-    def process_ready(self):
-        scheduler = self._scheduler
+    #   o For Elements which are QueueStatus.READY a Job will be
+    #     created and returned to the caller, given that the scheduler
+    #     allows the Queue enough resources for the given job
+    #
+    def pop_ready_jobs(self):
         unready = []
+        ready = []
 
-        while self._wait_queue and scheduler.get_job_token(self.queue_type):
-            element = self._wait_queue.popleft()
+        while self._wait_queue:
+            job = self._wait_queue.popleft()
+            element = job.element
 
             status = self.status(element)
             if status == QueueStatus.WAIT:
-                scheduler.put_job_token(self.queue_type)
-                unready.append(element)
+                unready.append(job)
                 continue
             elif status == QueueStatus.SKIP:
-                scheduler.put_job_token(self.queue_type)
-                self._done_queue.append(element)
+                self._done_queue.append(job)
                 self.skipped_elements.append(element)
                 continue
 
             self.prepare(element)
-
-            job = Job(scheduler, element, self.action_name,
-                      self.process, self._job_done,
-                      max_retries=self._max_retries)
-            scheduler.job_starting(job)
-
-            job.spawn()
-            self.active_jobs.append(job)
+            ready.append(job)
 
         # These were not ready but were in the beginning, give em
         # first priority again next time around
         self._wait_queue.extendleft(unready)
 
+        return ready
+
+    def peek_ready_jobs(self):
+        def ready(job):
+            return self.status(job.element) == QueueStatus.READY
+
+        yield from (job for job in self._wait_queue if ready(job))
+
     #####################################################
     #                 Private Methods                   #
     #####################################################
@@ -265,12 +267,16 @@ class Queue():
     #    job (Job): The job which completed
     #
     def _update_workspaces(self, element, job):
+        workspace_dict = None
+        if job.child_data:
+            workspace_dict = job.child_data.get('workspace', None)
+
         # Handle any workspace modifications now
         #
-        if job.workspace_dict:
+        if workspace_dict:
             context = element._get_context()
             workspaces = context.get_workspaces()
-            if workspaces.update_workspace(element._get_full_name(), job.workspace_dict):
+            if workspaces.update_workspace(element._get_full_name(), workspace_dict):
                 try:
                     workspaces.save_config()
                 except BstError as e:
@@ -291,17 +297,15 @@ class Queue():
     #
     def _job_done(self, job, element, success, result):
 
-        # Remove from our jobs
-        self.active_jobs.remove(job)
-
-        # Update workspaces in the main task before calling any queue implementation
+        # Update values that need to be synchronized in the main task
+        # before calling any queue implementation
         self._update_workspaces(element, job)
 
         # Give the result of the job to the Queue implementor,
         # and determine if it should be considered as processed
         # or skipped.
         try:
-            processed = self.done(element, result, success)
+            processed = self.done(job, element, result, success)
 
         except BstError as e:
 
@@ -330,7 +334,7 @@ class Queue():
             # No exception occured, handle the success/failure state in the normal way
             #
             if success:
-                self._done_queue.append(element)
+                self._done_queue.append(job)
                 if processed:
                     self.processed_elements.append(element)
                 else:
@@ -338,18 +342,22 @@ class Queue():
             else:
                 self.failed_elements.append(element)
 
-        # Give the token for this job back to the scheduler
-        # immediately before invoking another round of scheduling
-        self._scheduler.put_job_token(self.queue_type)
-
-        # Notify frontend
-        self._scheduler.job_completed(self, job, success)
-
-        self._scheduler.sched()
-
     # Convenience wrapper for Queue implementations to send
     # a message for the element they are processing
     def _message(self, element, message_type, brief, **kwargs):
         context = element._get_context()
         message = Message(element._get_unique_id(), message_type, brief, **kwargs)
         context.message(message)
+
+    def _element_log_path(self, element):
+        project = element._get_project()
+        context = element._get_context()
+
+        key = element._get_display_key()[1]
+        action = self.action_name.lower()
+        logfile = "{key}-{action}.{{pid}}.log".format(key=key, action=action)
+
+        directory = os.path.join(context.logdir, project.name, element.normal_name)
+
+        os.makedirs(directory, exist_ok=True)
+        return os.path.join(directory, logfile)
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index 3a65f01..c7a8f4c 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -23,7 +23,8 @@ from ...plugin import _plugin_lookup
 from ... import SourceError
 
 # Local imports
-from . import Queue, QueueStatus, QueueType
+from . import Queue, QueueStatus
+from ..resources import ResourceType
 
 
 # A queue which tracks sources
@@ -32,7 +33,7 @@ class TrackQueue(Queue):
 
     action_name = "Track"
     complete_name = "Tracked"
-    queue_type = QueueType.FETCH
+    resources = [ResourceType.DOWNLOAD]
 
     def process(self, element):
         return element._track()
@@ -47,7 +48,7 @@ class TrackQueue(Queue):
 
         return QueueStatus.READY
 
-    def done(self, element, result, success):
+    def done(self, _, element, result, success):
 
         if not success:
             return False
diff --git a/buildstream/_scheduler/resources.py b/buildstream/_scheduler/resources.py
new file mode 100644
index 0000000..bbf851b
--- /dev/null
+++ b/buildstream/_scheduler/resources.py
@@ -0,0 +1,105 @@
+class ResourceType():
+    CACHE = 0
+    DOWNLOAD = 1
+    PROCESS = 2
+    UPLOAD = 3
+
+
+class Resources():
+    def __init__(self, num_builders, num_fetchers, num_pushers):
+        self._max_resources = {
+            ResourceType.CACHE: 1,
+            ResourceType.DOWNLOAD: num_fetchers,
+            ResourceType.PROCESS: num_builders,
+            ResourceType.UPLOAD: num_pushers
+        }
+
+        # Resources jobs are currently using.
+        self._used_resources = {
+            ResourceType.CACHE: 0,
+            ResourceType.DOWNLOAD: 0,
+            ResourceType.PROCESS: 0,
+            ResourceType.UPLOAD: 0
+        }
+
+        # Resources jobs currently want exclusive access to. The set
+        # of jobs that have asked for exclusive access is the value -
+        # this is so that we can avoid scheduling any other jobs until
+        # *all* exclusive jobs that "register interest" have finished
+        # - which avoids starving them of scheduling time.
+        self._exclusive_resources = {
+            ResourceType.CACHE: set(),
+            ResourceType.DOWNLOAD: set(),
+            ResourceType.PROCESS: set(),
+            ResourceType.UPLOAD: set()
+        }
+
+    def clear_job_resources(self, job):
+        for resource in job.exclusive_resources:
+            self._exclusive_resources[resource].remove(hash(job))
+
+        for resource in job.resources:
+            self._used_resources[resource] -= 1
+
+    def reserve_exclusive_resources(self, job):
+        exclusive = job.exclusive_resources
+
+        # The very first thing we do is to register any exclusive
+        # resources this job may want. Even if the job is not yet
+        # allowed to run (because another job is holding the resource
+        # it wants), we can still set this - it just means that any
+        # job *currently* using these resources has to finish first,
+        # and no new jobs wanting these can be launched (except other
+        # exclusive-access jobs).
+        #
+        for resource in exclusive:
+            self._exclusive_resources[resource].add(hash(job))
+
+    def reserve_job_resources(self, job):
+        # First, we check if the job wants to access a resource that
+        # another job wants exclusive access to. If so, it cannot be
+        # scheduled.
+        #
+        # Note that if *both* jobs want this exclusively, we don't
+        # fail yet.
+        #
+        # FIXME: I *think* we can deadlock if two jobs want disjoint
+        #        sets of exclusive and non-exclusive resources. This
+        #        is currently not possible, but may be worth thinking
+        #        about.
+        #
+        for resource in job.resources - job.exclusive_resources:
+            # If our job wants this resource exclusively, we never
+            # check this, so we can get away with not (temporarily)
+            # removing it from the set.
+            if self._exclusive_resources[resource]:
+                return False
+
+        # Now we check if anything is currently using any resources
+        # this job wants exclusively. If so, the job cannot be
+        # scheduled.
+        #
+        # Since jobs that use a resource exclusively are also using
+        # it, this means only one exclusive job can ever be scheduled
+        # at a time, despite being allowed to be part of the exclusive
+        # set.
+        #
+        for exclusive in job.exclusive_resources:
+            if self._used_resources[exclusive] != 0:
+                return False
+
+        # Finally, we check if we have enough of each resource
+        # available. If we don't have enough, the job cannot be
+        # scheduled.
+        for resource in job.resources:
+            if (self._max_resources[resource] > 0 and
+                    self._used_resources[resource] >= self._max_resources[resource]):
+                return False
+
+        # Now we register the fact that our job is using the resources
+        # it asked for, and tell the scheduler that it is allowed to
+        # continue.
+        for resource in job.resources:
+            self._used_resources[resource] += 1
+
+        return True
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 7bfbc95..bc182db 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -21,12 +21,13 @@
 # System imports
 import os
 import asyncio
+from itertools import chain
 import signal
 import datetime
 from contextlib import contextmanager
 
 # Local imports
-from .queues import QueueType
+from .resources import Resources
 
 
 # A decent return code for Scheduler.run()
@@ -69,6 +70,8 @@ class Scheduler():
         #
         # Public members
         #
+        self.active_jobs = []       # Jobs currently being run in the scheduler
+        self.waiting_jobs = []      # Jobs waiting for resources
         self.queues = None          # Exposed for the frontend to print summaries
         self.context = context      # The Context object shared with Queues
         self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
@@ -90,13 +93,9 @@ class Scheduler():
         self._suspendtime = None
         self._queue_jobs = True      # Whether we should continue to queue jobs
 
-        # Initialize task tokens with the number allowed by
-        # the user configuration
-        self._job_tokens = {
-            QueueType.FETCH: context.sched_fetchers,
-            QueueType.BUILD: context.sched_builders,
-            QueueType.PUSH: context.sched_pushers
-        }
+        self._resources = Resources(context.sched_builders,
+                                    context.sched_fetchers,
+                                    context.sched_pushers)
 
     # run()
     #
@@ -129,7 +128,7 @@ class Scheduler():
         self._connect_signals()
 
         # Run the queues
-        self.sched()
+        self._schedule_queue_jobs()
         self.loop.run_forever()
         self.loop.close()
 
@@ -209,18 +208,74 @@ class Scheduler():
             starttime = timenow
         return timenow - starttime
 
-    # sched()
+    # schedule_jobs()
+    #
+    # Args:
+    #     jobs ([Job]): A list of jobs to schedule
+    #
+    # Schedule 'Job's for the scheduler to run. Jobs scheduled will be
+    # run as soon any other queueing jobs finish, provided sufficient
+    # resources are available for them to run
+    #
+    def schedule_jobs(self, jobs):
+        for job in jobs:
+            self.waiting_jobs.append(job)
+
+    # job_completed():
+    #
+    # Called when a Job completes
+    #
+    # Args:
+    #    queue (Queue): The Queue holding a complete job
+    #    job (Job): The completed Job
+    #    success (bool): Whether the Job completed with a success status
+    #
+    def job_completed(self, job, success):
+        self._resources.clear_job_resources(job)
+        self.active_jobs.remove(job)
+        self._job_complete_callback(job, success)
+        self._schedule_queue_jobs()
+        self._sched()
+
+    #######################################################
+    #                  Local Private Methods              #
+    #######################################################
+
+    # _sched()
     #
     # The main driving function of the scheduler, it will be called
-    # automatically when Scheduler.run() is called initially, and needs
-    # to be called whenever a job can potentially be scheduled, usually
-    # when a Queue completes handling of a job.
+    # automatically when Scheduler.run() is called initially,
+    #
+    def _sched(self):
+        for job in self.waiting_jobs:
+            self._resources.reserve_exclusive_resources(job)
+
+        for job in self.waiting_jobs:
+            if not self._resources.reserve_job_resources(job):
+                continue
+
+            job.spawn()
+            self.waiting_jobs.remove(job)
+            self.active_jobs.append(job)
+
+            if self._job_start_callback:
+                self._job_start_callback(job)
+
+        # If nothings ticking, time to bail out
+        if not self.active_jobs and not self.waiting_jobs:
+            self.loop.stop()
+
+    # _schedule_queue_jobs()
     #
-    # This will process the Queues and pull elements through the Queues
+    # Ask the queues what jobs they want to schedule and schedule
+    # them. This is done here so we can ask for new jobs when jobs
+    # from previous queues become available.
+    #
+    # This will process the Queues, pull elements through the Queues
     # and process anything that is ready.
     #
-    def sched(self):
-
+    def _schedule_queue_jobs(self):
+        ready = []
         process_queues = True
 
         while self._queue_jobs and process_queues:
@@ -233,90 +288,29 @@ class Scheduler():
 
                 # Dequeue processed elements for the next queue
                 elements = list(queue.dequeue())
-                elements = list(elements)
 
             # Kickoff whatever processes can be processed at this time
             #
-            # We start by queuing from the last queue first, because we want to
-            # give priority to queues later in the scheduling process in the case
-            # that multiple queues share the same token type.
+            # We start by queuing from the last queue first, because
+            # we want to give priority to queues later in the
+            # scheduling process in the case that multiple queues
+            # share the same token type.
             #
-            # This avoids starvation situations where we dont move on to fetch
-            # tasks for elements which failed to pull, and thus need all the pulls
-            # to complete before ever starting a build
-            for queue in reversed(self.queues):
-                queue.process_ready()
-
-            # process_ready() may have skipped jobs, adding them to the done_queue.
-            # Pull these skipped elements forward to the next queue and process them.
+            # This avoids starvation situations where we dont move on
+            # to fetch tasks for elements which failed to pull, and
+            # thus need all the pulls to complete before ever starting
+            # a build
+            ready.extend(chain.from_iterable(
+                queue.pop_ready_jobs() for queue in reversed(self.queues)
+            ))
+
+            # pop_ready_jobs() may have skipped jobs, adding them to
+            # the done_queue.  Pull these skipped elements forward to
+            # the next queue and process them.
             process_queues = any(q.dequeue_ready() for q in self.queues)
 
-        # If nothings ticking, time to bail out
-        ticking = 0
-        for queue in self.queues:
-            ticking += len(queue.active_jobs)
-
-        if ticking == 0:
-            self.loop.stop()
-
-    # get_job_token():
-    #
-    # Used by the Queue object to obtain a token for
-    # processing a Job, if a Queue does not receive a token
-    # then it must wait until a later time in order to
-    # process pending jobs.
-    #
-    # Args:
-    #    queue_type (QueueType): The type of token to obtain
-    #
-    # Returns:
-    #    (bool): Whether a token was handed out or not
-    #
-    def get_job_token(self, queue_type):
-        if self._job_tokens[queue_type] > 0:
-            self._job_tokens[queue_type] -= 1
-            return True
-        return False
-
-    # put_job_token():
-    #
-    # Return a job token to the scheduler. Tokens previously
-    # received with get_job_token() must be returned to
-    # the scheduler once the associated job is complete.
-    #
-    # Args:
-    #    queue_type (QueueType): The type of token to obtain
-    #
-    def put_job_token(self, queue_type):
-        self._job_tokens[queue_type] += 1
-
-    # job_starting():
-    #
-    # Called by the Queue when starting a Job
-    #
-    # Args:
-    #    job (Job): The starting Job
-    #
-    def job_starting(self, job):
-        if self._job_start_callback:
-            self._job_start_callback(job.element, job.action_name)
-
-    # job_completed():
-    #
-    # Called by the Queue when a Job completes
-    #
-    # Args:
-    #    queue (Queue): The Queue holding a complete job
-    #    job (Job): The completed Job
-    #    success (bool): Whether the Job completed with a success status
-    #
-    def job_completed(self, queue, job, success):
-        if self._job_complete_callback:
-            self._job_complete_callback(job.element, queue, job.action_name, success)
-
-    #######################################################
-    #                  Local Private Methods              #
-    #######################################################
+        self.schedule_jobs(ready)
+        self._sched()
 
     # _suspend_jobs()
     #
@@ -326,9 +320,8 @@ class Scheduler():
         if not self.suspended:
             self._suspendtime = datetime.datetime.now()
             self.suspended = True
-            for queue in self.queues:
-                for job in queue.active_jobs:
-                    job.suspend()
+            for job in self.active_jobs:
+                job.suspend()
 
     # _resume_jobs()
     #
@@ -336,9 +329,8 @@ class Scheduler():
     #
     def _resume_jobs(self):
         if self.suspended:
-            for queue in self.queues:
-                for job in queue.active_jobs:
-                    job.resume()
+            for job in self.active_jobs:
+                job.resume()
             self.suspended = False
             self._starttime += (datetime.datetime.now() - self._suspendtime)
             self._suspendtime = None
@@ -401,19 +393,18 @@ class Scheduler():
         wait_limit = 20.0
 
         # First tell all jobs to terminate
-        for queue in self.queues:
-            for job in queue.active_jobs:
-                job.terminate()
+        for job in self.active_jobs:
+            job.terminate()
 
         # Now wait for them to really terminate
-        for queue in self.queues:
-            for job in queue.active_jobs:
-                elapsed = datetime.datetime.now() - wait_start
-                timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
-                if not job.terminate_wait(timeout):
-                    job.kill()
-
-        self.loop.stop()
+        for job in self.active_jobs:
+            elapsed = datetime.datetime.now() - wait_start
+            timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
+            if not job.terminate_wait(timeout):
+                job.kill()
+
+        # Clear out the waiting jobs
+        self.waiting_jobs = []
 
     # Regular timeout for driving status in the UI
     def _tick(self):


[buildstream] 01/05: Bump required python version to 3.5

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch 135-expire-artifacts-in-local-cache-clean
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 1cffee6cbc521811a1e496927656e02f24f99cdb
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Mon Jul 16 14:20:05 2018 +0100

    Bump required python version to 3.5
---
 .gitlab-ci.yml              | 6 +-----
 NEWS                        | 1 +
 doc/source/install_main.rst | 2 +-
 setup.py                    | 4 ++--
 4 files changed, 5 insertions(+), 8 deletions(-)

diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 3c65546..d19a9d6 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -1,4 +1,4 @@
-image: buildstream/testsuite-debian:8-master-57-be5a863
+image: buildstream/buildstream-debian:master-81-caa5241
 
 cache:
   key: "$CI_JOB_NAME-"
@@ -88,9 +88,6 @@ source_dist:
     paths:
     - coverage-linux/
 
-tests-debian-8:
-  <<: *linux-tests
-
 tests-debian-9:
   image: buildstream/buildstream-debian:master-81-caa5241
   <<: *linux-tests
@@ -223,7 +220,6 @@ coverage:
     - coverage combine --rcfile=../.coveragerc -a coverage.*
     - coverage report --rcfile=../.coveragerc -m
   dependencies:
-  - tests-debian-8
   - tests-debian-9
   - tests-fedora-27
   - tests-unix
diff --git a/NEWS b/NEWS
index 0025e3d..2d0c634 100644
--- a/NEWS
+++ b/NEWS
@@ -14,6 +14,7 @@ buildstream 1.1.4
 buildstream 1.1.3
 =================
 
+  o BuildStream now requires python version >= 3.5
   o Added new `bst init` command to initialize a new project.
 
   o Cross junction tracking is now disabled by default for projects
diff --git a/doc/source/install_main.rst b/doc/source/install_main.rst
index abe85ee..1f542f2 100644
--- a/doc/source/install_main.rst
+++ b/doc/source/install_main.rst
@@ -6,7 +6,7 @@ Installing BuildStream
 ======================
 BuildStream requires the following base system requirements:
 
-* python3 >= 3.4
+* python3 >= 3.5
 * libostree >= v2017.8 with introspection data
 * bubblewrap >= 0.1.2
 * fuse2
diff --git a/setup.py b/setup.py
index 1f9ff00..46a86e2 100755
--- a/setup.py
+++ b/setup.py
@@ -24,8 +24,8 @@ import subprocess
 import sys
 import versioneer
 
-if sys.version_info[0] != 3 or sys.version_info[1] < 4:
-    print("BuildStream requires Python >= 3.4")
+if sys.version_info[0] != 3 or sys.version_info[1] < 5:
+    print("BuildStream requires Python >= 3.5")
     sys.exit(1)
 
 try:


[buildstream] 02/05: _exceptions.py: Add `detail` to ArtifactErrors

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch 135-expire-artifacts-in-local-cache-clean
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 47b6b6e6a5908c79fa4d46b2d3bbfab42bae0e2f
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Mon Jul 16 14:22:10 2018 +0100

    _exceptions.py: Add `detail` to ArtifactErrors
---
 buildstream/_exceptions.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/buildstream/_exceptions.py b/buildstream/_exceptions.py
index 3aadd52..96d634b 100644
--- a/buildstream/_exceptions.py
+++ b/buildstream/_exceptions.py
@@ -246,8 +246,8 @@ class SandboxError(BstError):
 # Raised when errors are encountered in the artifact caches
 #
 class ArtifactError(BstError):
-    def __init__(self, message, reason=None):
-        super().__init__(message, domain=ErrorDomain.ARTIFACT, reason=reason)
+    def __init__(self, message, *, detail=None, reason=None):
+        super().__init__(message, detail=detail, domain=ErrorDomain.ARTIFACT, reason=reason)
 
 
 # PipelineError


[buildstream] 05/05: WIP: Perform artifact cache expiry

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch 135-expire-artifacts-in-local-cache-clean
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit a2d775821d3734cd08f94d06989eb266c87123f2
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Mon Jul 16 17:56:46 2018 +0100

    WIP: Perform artifact cache expiry
---
 NEWS                                        |   3 +
 buildstream/_artifactcache/artifactcache.py | 179 ++++++++++++++++++-
 buildstream/_artifactcache/ostreecache.py   |  35 ++++
 buildstream/_artifactcache/tarcache.py      |  59 +++++++
 buildstream/_context.py                     |  55 ++++++
 buildstream/_ostree.py                      |   2 +
 buildstream/_pipeline.py                    |   2 +
 buildstream/_scheduler/jobs/cachesizejob.py |  91 ++++++++++
 buildstream/_scheduler/jobs/cleanupjob.py   |  72 ++++++++
 buildstream/_scheduler/queues/buildqueue.py |  17 ++
 buildstream/_scheduler/queues/pullqueue.py  |   5 +
 buildstream/_scheduler/queues/queue.py      |   2 +
 buildstream/_scheduler/scheduler.py         |  24 ++-
 buildstream/data/userconfig.yaml            |   8 +
 buildstream/element.py                      |  31 ++++
 buildstream/utils.py                        |  84 ++++++++-
 tests/artifactcache/expiry.py               | 264 ++++++++++++++++++++++++++++
 tests/artifactcache/expiry/project.conf     |  14 ++
 tests/frontend/push.py                      |  95 ++++++----
 19 files changed, 1001 insertions(+), 41 deletions(-)

diff --git a/NEWS b/NEWS
index 2d0c634..690fdb3 100644
--- a/NEWS
+++ b/NEWS
@@ -16,6 +16,9 @@ buildstream 1.1.3
 
   o BuildStream now requires python version >= 3.5
   o Added new `bst init` command to initialize a new project.
+  o BuildStream will now automatically clean up old artifacts when it
+    runs out of space. The exact behavior is configurable in
+    `userconf.yaml`.
 
   o Cross junction tracking is now disabled by default for projects
     which can support this by using project.refs ref-storage
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 2d745f8..bf8ff4a 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -21,7 +21,8 @@ import os
 import string
 from collections import Mapping, namedtuple
 
-from .._exceptions import ImplError, LoadError, LoadErrorReason
+from ..element import _KeyStrength
+from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
 from .._message import Message, MessageType
 from .. import utils
 from .. import _yaml
@@ -61,7 +62,11 @@ class ArtifactCacheSpec(namedtuple('ArtifactCacheSpec', 'url push')):
 class ArtifactCache():
     def __init__(self, context):
         self.context = context
+        self.required_artifacts = set()
         self.extractdir = os.path.join(context.artifactdir, 'extract')
+        self.max_size = context.cache_quota
+        self.estimated_size = None
+
         self.global_remote_specs = []
         self.project_remote_specs = {}
 
@@ -162,6 +167,102 @@ class ArtifactCache():
                                   (str(provenance)))
         return cache_specs
 
+    # append_required_artifacts():
+    #
+    # Append to the list of elements whose artifacts are required for
+    # the current run. Artifacts whose elements are in this list will
+    # be locked by the artifact cache and not touched for the duration
+    # of the current pipeline.
+    #
+    # Args:
+    #     elements (iterable): A set of elements to mark as required
+    #
+    def append_required_artifacts(self, elements):
+        # We lock both strong and weak keys - deleting one but not the
+        # other won't save space in most cases anyway, but would be a
+        # user inconvenience.
+
+        for element in elements:
+            strong_key = element._get_cache_key(strength=_KeyStrength.STRONG)
+            weak_key = element._get_cache_key(strength=_KeyStrength.WEAK)
+
+            for key in (strong_key, weak_key):
+                if key and key not in self.required_artifacts:
+                    self.required_artifacts.add(key)
+
+                    # We also update the usage times of any artifacts
+                    # we will be using, which helps preventing a
+                    # buildstream process that runs in parallel with
+                    # this one from removing artifacts in-use.
+                    try:
+                        self.update_atime(element, key)
+                    except FileNotFoundError:
+                        pass
+
+    # clean():
+    #
+    # Clean the artifact cache as much as possible.
+    #
+    def clean(self):
+        artifacts = self.list_artifacts()
+
+        while self.calculate_cache_size() >= self.context.cache_quota - self.context.cache_lower_threshold:
+            try:
+                to_remove = artifacts.pop(0)
+            except IndexError:
+                # If too many artifacts are required, and we therefore
+                # can't remove them, we have to abort the build.
+                #
+                # FIXME: Asking the user what to do may be neater
+                default_conf = os.path.join(os.environ['XDG_CONFIG_HOME'],
+                                            'buildstream.conf')
+                detail = ("There is not enough space to build the given element.\n"
+                          "Please increase the cache-quota in {}."
+                          .format(self.context.config_origin or default_conf))
+
+                if self.calculate_cache_size() > self.context.cache_quota:
+                    raise ArtifactError("Cache too full. Aborting.",
+                                        detail=detail,
+                                        reason="cache-too-full")
+                else:
+                    break
+
+            key = to_remove.rpartition('/')[2]
+            if key not in self.required_artifacts:
+                self.remove(to_remove)
+
+        # This should be O(1) if implemented correctly
+        return self.calculate_cache_size()
+
+    # get_approximate_cache_size()
+    #
+    # A cheap method that aims to serve as an upper limit on the
+    # artifact cache size.
+    #
+    # The cache size reported by this function will normally be larger
+    # than the real cache size, since it is calculated using the
+    # pre-commit artifact size, but for very small artifacts in
+    # certain caches additional overhead could cause this to be
+    # smaller than, but close to, the actual size.
+    #
+    # Nonetheless, in practice this should be safe to use as an upper
+    # limit on the cache size.
+    #
+    # If the cache has built-in constant-time size reporting, please
+    # feel free to override this method with a more accurate
+    # implementation.
+    #
+    # Returns:
+    #     (int) An approximation of the artifact cache size.
+    #
+    def get_approximate_cache_size(self):
+        # If we don't currently have an estimate, figure out the real
+        # cache size.
+        if self.estimated_size is None:
+            self.estimated_size = self.calculate_cache_size()
+
+        return self.estimated_size
+
     ################################################
     # Abstract methods for subclasses to implement #
     ################################################
@@ -191,6 +292,44 @@ class ArtifactCache():
         raise ImplError("Cache '{kind}' does not implement contains()"
                         .format(kind=type(self).__name__))
 
+    # list_artifacts():
+    #
+    # List artifacts in this cache in LRU order.
+    #
+    # Returns:
+    #     ([str]) - A list of artifact names as generated by
+    #               `ArtifactCache.get_artifact_fullname` in LRU order
+    #
+    def list_artifacts(self):
+        raise ImplError("Cache '{kind}' does not implement list_artifacts()"
+                        .format(kind=type(self).__name__))
+
+    # remove():
+    #
+    # Removes the artifact for the specified ref from the local
+    # artifact cache.
+    #
+    # Args:
+    #     ref (artifact_name): The name of the artifact to remove (as
+    #                          generated by
+    #                          `ArtifactCache.get_artifact_fullname`)
+    #
+    def remove(self, artifact_name):
+        raise ImplError("Cache '{kind}' does not implement remove()"
+                        .format(kind=type(self).__name__))
+
+    # update_atime():
+    #
+    # Update the access time of an element.
+    #
+    # Args:
+    #     element (Element): The Element to mark
+    #     key (str): The cache key to use
+    #
+    def update_atime(self, element, key):
+        raise ImplError("Cache '{kind}' does not implement update_atime()"
+                        .format(kind=type(self).__name__))
+
     # extract():
     #
     # Extract cached artifact for the specified Element if it hasn't
@@ -320,6 +459,20 @@ class ArtifactCache():
         raise ImplError("Cache '{kind}' does not implement link_key()"
                         .format(kind=type(self).__name__))
 
+    # calculate_cache_size()
+    #
+    # Return the real artifact cache size.
+    #
+    # Implementations should also use this to update estimated_size.
+    #
+    # Returns:
+    #
+    # (int) The size of the artifact cache.
+    #
+    def calculate_cache_size(self):
+        raise ImplError("Cache '{kind}' does not implement calculate_cache_size()"
+                        .format(kind=type(self).__name__))
+
     ################################################
     #               Local Private Methods          #
     ################################################
@@ -361,6 +514,30 @@ class ArtifactCache():
         with self.context.timed_activity("Initializing remote caches", silent_nested=True):
             self.initialize_remotes(on_failure=remote_failed)
 
+    # _add_artifact_size()
+    #
+    # Since we cannot keep track of the cache size between threads,
+    # this method will be called by the main process every time a
+    # process that added something to the cache finishes.
+    #
+    # This will then add the reported size to
+    # ArtifactCache.estimated_size.
+    #
+    def _add_artifact_size(self, artifact_size):
+        if not self.estimated_size:
+            self.estimated_size = self.calculate_cache_size()
+
+        self.estimated_size += artifact_size
+
+    # _set_cache_size()
+    #
+    # Similarly to the above method, when we calculate the actual size
+    # in a child thread, we can't update it. We instead pass the value
+    # back to the main thread and update it there.
+    #
+    def _set_cache_size(self, cache_size):
+        self.estimated_size = cache_size
+
 
 # _configured_remote_artifact_cache_specs():
 #
diff --git a/buildstream/_artifactcache/ostreecache.py b/buildstream/_artifactcache/ostreecache.py
index 707a974..bb8f4fc 100644
--- a/buildstream/_artifactcache/ostreecache.py
+++ b/buildstream/_artifactcache/ostreecache.py
@@ -59,6 +59,9 @@ class OSTreeCache(ArtifactCache):
         self._has_fetch_remotes = False
         self._has_push_remotes = False
 
+        # A cached artifact cache size (irony?)
+        self.cache_size = None
+
     ################################################
     #     Implementation of abstract methods       #
     ################################################
@@ -90,6 +93,21 @@ class OSTreeCache(ArtifactCache):
         ref = self.get_artifact_fullname(element, key)
         return _ostree.exists(self.repo, ref)
 
+    def list_artifacts(self):
+        return _ostree.list_artifacts(self.repo)
+
+    def remove(self, artifact_name):
+        # We cannot defer pruning, unfortunately, because we could
+        # otherwise not figure out how much space was freed by the
+        # removal, and would therefore not be able to expire the
+        # correct number of artifacts.
+        self.cache_size -= _ostree.remove(self.repo, artifact_name, defer_prune=False)
+
+    def update_atime(self, element, key):
+        ref = self.get_artifact_fullname(element, key)
+        ref_file = os.path.join(self.repo.get_path().get_path(), 'refs', 'heads', ref)
+        os.utime(ref_file)
+
     def extract(self, element, key):
         ref = self.get_artifact_fullname(element, key)
 
@@ -99,6 +117,9 @@ class OSTreeCache(ArtifactCache):
         # Extracting a nonexistent artifact is a bug
         assert rev, "Artifact missing for {}".format(ref)
 
+        ref_file = os.path.join(self.repo.get_path().get_path(), 'refs', 'heads', ref)
+        os.utime(ref_file)
+
         dest = os.path.join(self.extractdir, element._get_project().name, element.normal_name, rev)
         if os.path.isdir(dest):
             # artifact has already been extracted
@@ -134,6 +155,10 @@ class OSTreeCache(ArtifactCache):
         except OSTreeError as e:
             raise ArtifactError("Failed to commit artifact: {}".format(e)) from e
 
+        self.append_required_artifacts([element])
+
+        self.cache_size = None
+
     def can_diff(self):
         return True
 
@@ -167,6 +192,9 @@ class OSTreeCache(ArtifactCache):
                 # fetch the artifact from highest priority remote using the specified cache key
                 remote_name = self._ensure_remote(self.repo, remote.pull_url)
                 _ostree.fetch(self.repo, remote=remote_name, ref=ref, progress=progress)
+
+                self.append_required_artifacts([element])
+
                 return True
             except OSTreeError:
                 # Try next remote
@@ -201,6 +229,13 @@ class OSTreeCache(ArtifactCache):
 
         return any_pushed
 
+    def calculate_cache_size(self):
+        if self.cache_size is None:
+            self.cache_size = utils._get_dir_size(self.repo.get_path().get_path())
+            self.estimated_size = self.cache_size
+
+        return self.cache_size
+
     def initialize_remotes(self, *, on_failure=None):
         remote_specs = self.global_remote_specs.copy()
 
diff --git a/buildstream/_artifactcache/tarcache.py b/buildstream/_artifactcache/tarcache.py
index ab814ab..4e9f5f9 100644
--- a/buildstream/_artifactcache/tarcache.py
+++ b/buildstream/_artifactcache/tarcache.py
@@ -36,6 +36,7 @@ class TarCache(ArtifactCache):
 
         self.tardir = os.path.join(context.artifactdir, 'tar')
         os.makedirs(self.tardir, exist_ok=True)
+        self.cache_size = None
 
     ################################################
     #     Implementation of abstract methods       #
@@ -44,6 +45,34 @@ class TarCache(ArtifactCache):
         path = os.path.join(self.tardir, _tarpath(element, key))
         return os.path.isfile(path)
 
+    # list_artifacts():
+    #
+    # List artifacts in this cache in LRU order.
+    #
+    # Returns:
+    #     (list) - A list of refs in LRU order
+    #
+    def list_artifacts(self):
+        artifacts = list(utils.list_relative_paths(self.tardir, list_dirs=False))
+        mtimes = [os.path.getmtime(os.path.join(self.tardir, artifact))
+                  for artifact in artifacts if artifact]
+
+        # We need to get rid of the tarfile extension to get a proper
+        # ref - os.splitext doesn't do this properly, unfortunately.
+        artifacts = [artifact[:-len('.tar.bz2')] for artifact in artifacts]
+
+        return [name for _, name in sorted(zip(mtimes, artifacts))]
+
+    # remove()
+    #
+    # Implements artifactcache.remove().
+    #
+    def remove(self, artifact_name):
+        artifact = os.path.join(self.tardir, artifact_name + '.tar.bz2')
+        size = os.stat(artifact, follow_symlinks=False).st_size
+        os.remove(artifact)
+        self.cache_size -= size
+
     def commit(self, element, content, keys):
         os.makedirs(os.path.join(self.tardir, element._get_project().name, element.normal_name), exist_ok=True)
 
@@ -56,6 +85,21 @@ class TarCache(ArtifactCache):
 
                 _Tar.archive(os.path.join(self.tardir, ref), key, temp)
 
+            self.cache_size = None
+            self.append_required_artifacts([element])
+
+    # update_atime():
+    #
+    # Update the access time of an element.
+    #
+    # Args:
+    #     element (Element): The Element to mark
+    #     key (str): The cache key to use
+    #
+    def update_atime(self, element, key):
+        path = _tarpath(element, key)
+        os.utime(os.path.join(self.tardir, path))
+
     def extract(self, element, key):
 
         fullname = self.get_artifact_fullname(element, key)
@@ -90,6 +134,21 @@ class TarCache(ArtifactCache):
 
         return dest
 
+    # get_cache_size()
+    #
+    # Return the artifact cache size.
+    #
+    # Returns:
+    #
+    # (int) The size of the artifact cache.
+    #
+    def calculate_cache_size(self):
+        if self.cache_size is None:
+            self.cache_size = utils._get_dir_size(self.tardir)
+            self.estimated_size = self.cache_size
+
+        return self.cache_size
+
 
 # _tarpath()
 #
diff --git a/buildstream/_context.py b/buildstream/_context.py
index 1a59af2..5cc7f43 100644
--- a/buildstream/_context.py
+++ b/buildstream/_context.py
@@ -21,6 +21,7 @@ import os
 import datetime
 from collections import deque, Mapping
 from contextlib import contextmanager
+from . import utils
 from . import _cachekey
 from . import _signals
 from . import _site
@@ -62,6 +63,12 @@ class Context():
         # The locations from which to push and pull prebuilt artifacts
         self.artifact_cache_specs = []
 
+        # The artifact cache quota
+        self.cache_quota = None
+
+        # The lower threshold to which we aim to reduce the cache size
+        self.cache_lower_threshold = None
+
         # The directory to store build logs
         self.logdir = None
 
@@ -153,6 +160,7 @@ class Context():
         _yaml.node_validate(defaults, [
             'sourcedir', 'builddir', 'artifactdir', 'logdir',
             'scheduler', 'artifacts', 'logging', 'projects',
+            'cache'
         ])
 
         for directory in ['sourcedir', 'builddir', 'artifactdir', 'logdir']:
@@ -165,6 +173,53 @@ class Context():
             path = os.path.normpath(path)
             setattr(self, directory, path)
 
+        # Load quota configuration
+        # We need to find the first existing directory in the path of
+        # our artifactdir - the artifactdir may not have been created
+        # yet.
+        cache = _yaml.node_get(defaults, Mapping, 'cache')
+        _yaml.node_validate(cache, ['quota'])
+
+        artifactdir_volume = self.artifactdir
+        while not os.path.exists(artifactdir_volume):
+            artifactdir_volume = os.path.dirname(artifactdir_volume)
+
+        # We read and parse the cache quota as specified by the user
+        cache_quota = _yaml.node_get(cache, str, 'quota', default_value='infinity')
+        try:
+            cache_quota = utils._parse_size(cache_quota, artifactdir_volume)
+        except utils.UtilError as e:
+            raise LoadError(LoadErrorReason.INVALID_DATA,
+                            "{}\nPlease specify the value in bytes or as a % of full disk space.\n"
+                            "\nValid values are, for example: 800M 10G 1T 50%\n"
+                            .format(str(e))) from e
+
+        # If we are asked not to set a quota, we set it to the maximum
+        # disk space available minus a headroom of 2GB, such that we
+        # at least try to avoid raising Exceptions.
+        #
+        # Of course, we might still end up running out during a build
+        # if we end up writing more than 2G, but hey, this stuff is
+        # already really fuzzy.
+        #
+        if cache_quota is None:
+            stat = os.statvfs(artifactdir_volume)
+            # Again, the artifact directory may not yet have been
+            # created
+            if not os.path.exists(self.artifactdir):
+                cache_size = 0
+            else:
+                cache_size = utils._get_dir_size(self.artifactdir)
+            cache_quota = cache_size + stat.f_bsize * stat.f_bavail
+
+        if 'BST_TEST_SUITE' in os.environ:
+            headroom = 0
+        else:
+            headroom = 2e9
+
+        self.cache_quota = cache_quota - headroom
+        self.cache_lower_threshold = self.cache_quota / 2
+
         # Load artifact share configuration
         self.artifact_cache_specs = ArtifactCache.specs_from_config_node(defaults)
 
diff --git a/buildstream/_ostree.py b/buildstream/_ostree.py
index 238c6b4..d060960 100644
--- a/buildstream/_ostree.py
+++ b/buildstream/_ostree.py
@@ -565,6 +565,8 @@ def list_artifacts(repo):
     ref_heads = os.path.join(repo.get_path().get_path(), 'refs', 'heads')
 
     # obtain list of <project>/<element>/<key>
+    # FIXME: ostree 2017.11+ supports a flag that would allow
+    #        listing only local refs
     refs = _list_all_refs(repo).keys()
 
     mtimes = []
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 9f4504d..7f159c7 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -159,6 +159,8 @@ class Pipeline():
                 # Determine initial element state.
                 element._update_state()
 
+            self._artifacts.append_required_artifacts((e for e in self.dependencies(targets, Scope.ALL)))
+
     # dependencies()
     #
     # Generator function to iterate over elements and optionally
diff --git a/buildstream/_scheduler/jobs/cachesizejob.py b/buildstream/_scheduler/jobs/cachesizejob.py
new file mode 100644
index 0000000..897e896
--- /dev/null
+++ b/buildstream/_scheduler/jobs/cachesizejob.py
@@ -0,0 +1,91 @@
+#  Copyright (C) 2018 Codethink Limited
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Author:
+#        Tristan Daniël Maat <tr...@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from .job import Job
+from ..._platform import Platform
+from ..._message import Message, MessageType
+
+
+class CacheSizeJob(Job):
+    def __init__(self, *args, complete_cb, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._complete_cb = complete_cb
+        self._cache = Platform._instance.artifactcache
+
+    def _child_process(self):
+        return self._cache.calculate_cache_size()
+
+    def _parent_complete(self, success, result):
+        self._cache._set_cache_size(result)
+        if self._complete_cb:
+            self._complete_cb(result)
+
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        self._logfile = logfile.format(pid=os.getpid())
+        yield self._logfile
+        self._logfile = None
+
+    # _message():
+    #
+    # Sends a message to the frontend
+    #
+    # Args:
+    #    message_type (MessageType): The type of message to send
+    #    message (str): The message
+    #    kwargs: Remaining Message() constructor arguments
+    #
+    def _message(self, message_type, message, **kwargs):
+        args = dict(kwargs)
+        args['scheduler'] = True
+        self._scheduler.context.message(Message(None, message_type, message, **args))
+
+    def _child_log(self, message):
+        with open(self._logfile, 'a+') as log:
+            INDENT = "    "
+            EMPTYTIME = "--:--:--"
+
+            template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
+            detail = ''
+            if message.detail is not None:
+                template += "\n\n{detail}"
+                detail = message.detail.rstrip('\n')
+                detail = INDENT + INDENT.join(detail.splitlines(True))
+
+            timecode = EMPTYTIME
+            if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+                hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+                minutes, seconds = divmod(remainder, 60)
+                timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+            message_text = template.format(timecode=timecode,
+                                           type=message.message_type.upper(),
+                                           name='cache_size',
+                                           message=message.message,
+                                           detail=detail)
+
+            log.write('{}\n'.format(message_text))
+            log.flush()
+
+        return message
+
+    def _child_process_data(self):
+        return {}
diff --git a/buildstream/_scheduler/jobs/cleanupjob.py b/buildstream/_scheduler/jobs/cleanupjob.py
new file mode 100644
index 0000000..2de3680
--- /dev/null
+++ b/buildstream/_scheduler/jobs/cleanupjob.py
@@ -0,0 +1,72 @@
+#  Copyright (C) 2018 Codethink Limited
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Author:
+#        Tristan Daniël Maat <tr...@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from .job import Job
+from ..._platform import Platform
+from ..._message import Message
+
+
+class CleanupJob(Job):
+    def __init__(self, *args, complete_cb, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._complete_cb = complete_cb
+        self._cache = Platform._instance.artifactcache
+
+    def _child_process(self):
+        return self._cache.clean()
+
+    def _parent_complete(self, success, result):
+        self._cache._set_cache_size(result)
+        if self._complete_cb:
+            self._complete_cb()
+
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        self._logfile = logfile.format(pid=os.getpid())
+        yield self._logfile
+        self._logfile = None
+
+    # _message():
+    #
+    # Sends a message to the frontend
+    #
+    # Args:
+    #    message_type (MessageType): The type of message to send
+    #    message (str): The message
+    #    kwargs: Remaining Message() constructor arguments
+    #
+    def _message(self, message_type, message, **kwargs):
+        args = dict(kwargs)
+        args['scheduler'] = True
+        self._scheduler.context.message(Message(None, message_type, message, **args))
+
+    def _child_log(self, message):
+        message.action_name = self.action_name
+
+        with open(self._logfile, 'a+') as log:
+            message_text = self._format_frontend_message(message, '[cleanup]')
+            log.write('{}\n'.format(message_text))
+            log.flush()
+
+        return message
+
+    def _child_process_data(self):
+        return {}
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 7f8ac9e..376ef5a 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -51,10 +51,27 @@ class BuildQueue(Queue):
 
         return QueueStatus.READY
 
+    def _check_cache_size(self, job, element):
+        if not job.child_data:
+            return
+
+        artifact_size = job.child_data.get('artifact_size', False)
+
+        if artifact_size:
+            cache = element._get_artifact_cache()
+            cache._add_artifact_size(artifact_size)
+
+            if cache.get_approximate_cache_size() > self._scheduler.context.cache_quota:
+                self._scheduler._check_cache_size_real()
+
     def done(self, job, element, result, success):
 
         if success:
             # Inform element in main process that assembly is done
             element._assemble_done()
 
+        # This has to be done after _assemble_done, such that the
+        # element may register its cache key as required
+        self._check_cache_size(job, element)
+
         return True
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index efaa59e..430afc4 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -59,6 +59,11 @@ class PullQueue(Queue):
 
         element._pull_done()
 
+        # Build jobs will check the "approximate" size first. Since we
+        # do not get an artifact size from pull jobs, we have to
+        # actually check the cache size.
+        self._scheduler._check_cache_size_real()
+
         # Element._pull() returns True if it downloaded an artifact,
         # here we want to appear skipped if we did not download.
         return result
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 8ca3ac0..ac20d37 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -300,6 +300,8 @@ class Queue():
         # Update values that need to be synchronized in the main task
         # before calling any queue implementation
         self._update_workspaces(element, job)
+        if job.child_data:
+            element._get_artifact_cache().cache_size = job.child_data.get('cache_size')
 
         # Give the result of the job to the Queue implementor,
         # and determine if it should be considered as processed
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index bc182db..aeb3293 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -27,7 +27,8 @@ import datetime
 from contextlib import contextmanager
 
 # Local imports
-from .resources import Resources
+from .resources import Resources, ResourceType
+from .jobs import CacheSizeJob, CleanupJob
 
 
 # A decent return code for Scheduler.run()
@@ -312,6 +313,27 @@ class Scheduler():
         self.schedule_jobs(ready)
         self._sched()
 
+    def _run_cleanup(self, cache_size):
+        if cache_size and cache_size < self.context.cache_quota:
+            return
+
+        logpath = os.path.join(self.context.logdir, 'cleanup.{pid}.log')
+        job = CleanupJob(self, 'cleanup', logpath,
+                         resources=[ResourceType.CACHE,
+                                    ResourceType.PROCESS],
+                         exclusive_resources=[ResourceType.CACHE],
+                         complete_cb=None)
+        self.schedule_jobs([job])
+
+    def _check_cache_size_real(self):
+        logpath = os.path.join(self.context.logdir, 'cache_size.{pid}.log')
+        job = CacheSizeJob(self, 'cache_size', logpath,
+                           resources=[ResourceType.CACHE,
+                                      ResourceType.PROCESS],
+                           exclusive_resources=[ResourceType.CACHE],
+                           complete_cb=self._run_cleanup)
+        self.schedule_jobs([job])
+
     # _suspend_jobs()
     #
     # Suspend all ongoing jobs.
diff --git a/buildstream/data/userconfig.yaml b/buildstream/data/userconfig.yaml
index 6bb54ff..6f9f190 100644
--- a/buildstream/data/userconfig.yaml
+++ b/buildstream/data/userconfig.yaml
@@ -23,6 +23,14 @@ artifactdir: ${XDG_CACHE_HOME}/buildstream/artifacts
 logdir: ${XDG_CACHE_HOME}/buildstream/logs
 
 #
+#    Cache
+#
+cache:
+  # Size of the artifact cache - BuildStream will attempt to keep the
+  # artifact cache within this size.
+  quota: infinity
+
+#
 #    Scheduler
 #
 scheduler:
diff --git a/buildstream/element.py b/buildstream/element.py
index fc21f80..518fb59 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -225,6 +225,7 @@ class Element(Plugin):
         self.__staged_sources_directory = None  # Location where Element.stage_sources() was called
         self.__tainted = None                   # Whether the artifact is tainted and should not be shared
         self.__required = False                 # Whether the artifact is required in the current session
+        self.__artifact_size = None             # The size of data committed to the artifact cache
 
         # hash tables of loaded artifact metadata, hashed by key
         self.__metadata_keys = {}                     # Strong and weak keys for this key
@@ -1397,6 +1398,16 @@ class Element(Plugin):
             workspace.clear_running_files()
             self._get_context().get_workspaces().save_config()
 
+            # We also need to update the required artifacts, since
+            # workspaced dependencies do not have a fixed cache key
+            # when the build starts.
+            #
+            # This does *not* cause a race condition, because
+            # _assemble_done is called before a cleanup job may be
+            # launched.
+            #
+            self.__artifacts.append_required_artifacts([self])
+
     # _assemble():
     #
     # Internal method for running the entire build phase.
@@ -1524,6 +1535,7 @@ class Element(Plugin):
                 }), os.path.join(metadir, 'workspaced-dependencies.yaml'))
 
                 with self.timed_activity("Caching artifact"):
+                    self.__artifact_size = utils._get_dir_size(assembledir)
                     self.__artifacts.commit(self, assembledir, self.__get_cache_keys_for_commit())
 
             # Finally cleanup the build dir
@@ -1763,6 +1775,25 @@ class Element(Plugin):
         workspaces = self._get_context().get_workspaces()
         return workspaces.get_workspace(self._get_full_name())
 
+    # _get_artifact_size()
+    #
+    # Get the size of the artifact produced by this element in the
+    # current pipeline - if this element has not been assembled or
+    # pulled, this will be None.
+    #
+    # Note that this is the size of an artifact *before* committing it
+    # to the cache, the size on disk may differ. It can act as an
+    # approximate guide for when to do a proper size calculation.
+    #
+    # Returns:
+    #    (int|None): The size of the artifact
+    #
+    def _get_artifact_size(self):
+        return self.__artifact_size
+
+    def _get_artifact_cache(self):
+        return self.__artifacts
+
     # _write_script():
     #
     # Writes a script to the given directory.
diff --git a/buildstream/utils.py b/buildstream/utils.py
index b81a6c8..e8270d8 100644
--- a/buildstream/utils.py
+++ b/buildstream/utils.py
@@ -96,7 +96,7 @@ class FileListResult():
         return ret
 
 
-def list_relative_paths(directory):
+def list_relative_paths(directory, *, list_dirs=True):
     """A generator for walking directory relative paths
 
     This generator is useful for checking the full manifest of
@@ -110,6 +110,7 @@ def list_relative_paths(directory):
 
     Args:
        directory (str): The directory to list files in
+       list_dirs (bool): Whether to list directories
 
     Yields:
        Relative filenames in `directory`
@@ -136,15 +137,16 @@ def list_relative_paths(directory):
         # subdirectories in the walked `dirpath`, so we extract
         # these symlinks from `dirnames`
         #
-        for d in dirnames:
-            fullpath = os.path.join(dirpath, d)
-            if os.path.islink(fullpath):
-                yield os.path.join(basepath, d)
+        if list_dirs:
+            for d in dirnames:
+                fullpath = os.path.join(dirpath, d)
+                if os.path.islink(fullpath):
+                    yield os.path.join(basepath, d)
 
         # We've decended into an empty directory, in this case we
         # want to include the directory itself, but not in any other
         # case.
-        if not filenames:
+        if list_dirs and not filenames:
             yield relpath
 
         # List the filenames in the walked directory
@@ -536,6 +538,76 @@ def save_file_atomic(filename, mode='w', *, buffering=-1, encoding=None,
         raise
 
 
+# _get_dir_size():
+#
+# Get the disk usage of a given directory in bytes.
+#
+# Arguments:
+#     (str) The path whose size to check.
+#
+# Returns:
+#     (int) The size on disk in bytes.
+#
+def _get_dir_size(path):
+    path = os.path.abspath(path)
+
+    def get_size(path):
+        total = 0
+
+        for f in os.scandir(path):
+            total += f.stat(follow_symlinks=False).st_size
+
+            if f.is_dir(follow_symlinks=False):
+                total += get_size(f.path)
+
+        return total
+
+    return get_size(path)
+
+
+# _parse_size():
+#
+# Convert a string representing data size to a number of
+# bytes. E.g. "2K" -> 2048.
+#
+# This uses the same format as systemd's
+# [resource-control](https://www.freedesktop.org/software/systemd/man/systemd.resource-control.html#).
+#
+# Arguments:
+#     size (str) The string to parse
+#     volume (str) A path on the volume to consider for percentage
+#                  specifications
+#
+# Returns:
+#     (int|None) The number of bytes, or None if 'infinity' was specified.
+#
+# Raises:
+#     UtilError if the string is not a valid data size.
+#
+def _parse_size(size, volume):
+    if size == 'infinity':
+        return None
+
+    matches = re.fullmatch(r'([0-9]+\.?[0-9]*)([KMGT%]?)', size)
+    if matches is None:
+        raise UtilError("{} is not a valid data size.".format(size))
+
+    num, unit = matches.groups()
+
+    if unit == '%':
+        num = float(num)
+        if num > 100:
+            raise UtilError("{}% is not a valid percentage value.".format(num))
+
+        stat_ = os.statvfs(volume)
+        disk_size = stat_.f_blocks * stat_.f_bsize
+
+        return disk_size * (num / 100)
+
+    units = ('', 'K', 'M', 'G', 'T')
+    return int(num) * 1024**units.index(unit)
+
+
 # A sentinel to be used as a default argument for functions that need
 # to distinguish between a kwarg set to None and an unset kwarg.
 _sentinel = object()
diff --git a/tests/artifactcache/expiry.py b/tests/artifactcache/expiry.py
new file mode 100644
index 0000000..4c74105
--- /dev/null
+++ b/tests/artifactcache/expiry.py
@@ -0,0 +1,264 @@
+import os
+
+import pytest
+
+from buildstream import _yaml
+from buildstream._exceptions import ErrorDomain, LoadErrorReason
+
+from tests.testutils import cli
+
+
+DATA_DIR = os.path.join(
+    os.path.dirname(os.path.realpath(__file__)),
+    "expiry"
+)
+
+
+def create_element(name, path, dependencies, size):
+    os.makedirs(path, exist_ok=True)
+
+    # Create a file to be included in this element's artifact
+    with open(os.path.join(path, name + '_data'), 'wb+') as f:
+        f.write(os.urandom(size))
+
+    element = {
+        'kind': 'import',
+        'sources': [
+            {
+                'kind': 'local',
+                'path': os.path.join(path, name + '_data')
+            }
+        ],
+        'depends': dependencies
+    }
+    _yaml.dump(element, os.path.join(path, name))
+
+
+# Ensure that the cache successfully removes an old artifact if we do
+# not have enough space left.
+@pytest.mark.datafiles(DATA_DIR)
+def test_artifact_expires(cli, datafiles, tmpdir):
+    project = os.path.join(datafiles.dirname, datafiles.basename)
+    element_path = os.path.join(project, 'elements')
+    cache_location = os.path.join(project, 'cache', 'artifacts', 'ostree')
+    checkout = os.path.join(project, 'checkout')
+
+    cli.configure({
+        'cache': {
+            'quota': 10000000,
+        }
+    })
+
+    # Create an element that uses almost the entire cache (an empty
+    # ostree cache starts at about ~10KiB, so we need a bit of a
+    # buffer)
+    create_element('target.bst', element_path, [], 6000000)
+    res = cli.run(project=project, args=['build', 'target.bst'])
+    res.assert_success()
+
+    assert cli.get_element_state(project, 'target.bst') == 'cached'
+
+    # Our cache should now be almost full. Let's create another
+    # artifact and see if we can cause buildstream to delete the old
+    # one.
+    create_element('target2.bst', element_path, [], 6000000)
+    res = cli.run(project=project, args=['build', 'target2.bst'])
+    res.assert_success()
+
+    # Check that the correct element remains in the cache
+    assert cli.get_element_state(project, 'target.bst') != 'cached'
+    assert cli.get_element_state(project, 'target2.bst') == 'cached'
+
+
+# Ensure that we don't end up deleting the whole cache (or worse) if
+# we try to store an artifact that is too large to fit in the quota.
+@pytest.mark.parametrize('size', [
+    # Test an artifact that is obviously too large
+    (500000),
+    # Test an artifact that might be too large due to slight overhead
+    # of storing stuff in ostree
+    (399999)
+])
+@pytest.mark.datafiles(DATA_DIR)
+def test_artifact_too_large(cli, datafiles, tmpdir, size):
+    project = os.path.join(datafiles.dirname, datafiles.basename)
+    element_path = os.path.join(project, 'elements')
+
+    cli.configure({
+        'cache': {
+            'quota': 400000
+        }
+    })
+
+    # Create an element whose artifact is too large
+    create_element('target.bst', element_path, [], size)
+    res = cli.run(project=project, args=['build', 'target.bst'])
+    res.assert_main_error(ErrorDomain.STREAM, None)
+
+
+@pytest.mark.datafiles(DATA_DIR)
+def test_expiry_order(cli, datafiles, tmpdir):
+    project = os.path.join(datafiles.dirname, datafiles.basename)
+    element_path = os.path.join(project, 'elements')
+    cache_location = os.path.join(project, 'cache', 'artifacts', 'ostree')
+    checkout = os.path.join(project, 'workspace')
+
+    cli.configure({
+        'cache': {
+            'quota': 9000000
+        }
+    })
+
+    # Create an artifact
+    create_element('dep.bst', element_path, [], 2000000)
+    res = cli.run(project=project, args=['build', 'dep.bst'])
+    res.assert_success()
+
+    # Create another artifact
+    create_element('unrelated.bst', element_path, [], 2000000)
+    res = cli.run(project=project, args=['build', 'unrelated.bst'])
+    res.assert_success()
+
+    # And build something else
+    create_element('target.bst', element_path, [], 2000000)
+    res = cli.run(project=project, args=['build', 'target.bst'])
+    res.assert_success()
+
+    create_element('target2.bst', element_path, [], 2000000)
+    res = cli.run(project=project, args=['build', 'target2.bst'])
+    res.assert_success()
+
+    # Now extract dep.bst
+    res = cli.run(project=project, args=['checkout', 'dep.bst', checkout])
+    res.assert_success()
+
+    # Finally, build something that will cause the cache to overflow
+    create_element('expire.bst', element_path, [], 2000000)
+    res = cli.run(project=project, args=['build', 'expire.bst'])
+    res.assert_success()
+
+    # While dep.bst was the first element to be created, it should not
+    # have been removed.
+    # Note that buildstream will reduce the cache to 50% of the
+    # original size - we therefore remove multiple elements.
+
+    assert (tuple(cli.get_element_state(project, element) for element in
+                  ('unrelated.bst', 'target.bst', 'target2.bst', 'dep.bst', 'expire.bst')) ==
+            ('buildable', 'buildable', 'buildable', 'cached', 'cached', ))
+
+
+# Ensure that we don't accidentally remove an artifact from something
+# in the current build pipeline, because that would be embarassing,
+# wouldn't it?
+@pytest.mark.datafiles(DATA_DIR)
+def test_keep_dependencies(cli, datafiles, tmpdir):
+    project = os.path.join(datafiles.dirname, datafiles.basename)
+    element_path = os.path.join(project, 'elements')
+    cache_location = os.path.join(project, 'cache', 'artifacts', 'ostree')
+
+    cli.configure({
+        'cache': {
+            'quota': 10000000
+        }
+    })
+
+    # Create a pretty big dependency
+    create_element('dependency.bst', element_path, [], 5000000)
+    res = cli.run(project=project, args=['build', 'dependency.bst'])
+    res.assert_success()
+
+    # Now create some other unrelated artifact
+    create_element('unrelated.bst', element_path, [], 4000000)
+    res = cli.run(project=project, args=['build', 'unrelated.bst'])
+    res.assert_success()
+
+    # Check that the correct element remains in the cache
+    assert cli.get_element_state(project, 'dependency.bst') == 'cached'
+    assert cli.get_element_state(project, 'unrelated.bst') == 'cached'
+
+    # We try to build an element which depends on the LRU artifact,
+    # and could therefore fail if we didn't make sure dependencies
+    # aren't removed.
+    #
+    # Since some artifact caches may implement weak cache keys by
+    # duplicating artifacts (bad!) we need to make this equal in size
+    # or smaller than half the size of its dependencies.
+    #
+    create_element('target.bst', element_path, ['dependency.bst'], 2000000)
+    res = cli.run(project=project, args=['build', 'target.bst'])
+    res.assert_success()
+
+    assert cli.get_element_state(project, 'unrelated.bst') != 'cached'
+    assert cli.get_element_state(project, 'dependency.bst') == 'cached'
+    assert cli.get_element_state(project, 'target.bst') == 'cached'
+
+
+# Assert that we never delete a dependency required for a build tree
+@pytest.mark.datafiles(DATA_DIR)
+def test_never_delete_dependencies(cli, datafiles, tmpdir):
+    project = os.path.join(datafiles.dirname, datafiles.basename)
+    element_path = os.path.join(project, 'elements')
+
+    cli.configure({
+        'cache': {
+            'quota': 10000000
+        }
+    })
+
+    # Create a build tree
+    create_element('dependency.bst', element_path, [], 8000000)
+    create_element('related.bst', element_path, ['dependency.bst'], 8000000)
+    create_element('target.bst', element_path, ['related.bst'], 8000000)
+    create_element('target2.bst', element_path, ['target.bst'], 8000000)
+
+    # We try to build this pipeline, but it's too big for the
+    # cache. Since all elements are required, the build should fail.
+    res = cli.run(project=project, args=['build', 'target2.bst'])
+    res.assert_main_error(ErrorDomain.STREAM, None)
+
+    assert cli.get_element_state(project, 'dependency.bst') == 'cached'
+
+    # This is *technically* above the cache limit. BuildStream accepts
+    # some fuzziness, since it's hard to assert that we don't create
+    # an artifact larger than the cache quota. We would have to remove
+    # the artifact after-the-fact, but since it is required for the
+    # current build and nothing broke yet, it's nicer to keep it
+    # around.
+    #
+    # This scenario is quite unlikely, and the cache overflow will be
+    # resolved if the user does something about it anyway.
+    #
+    assert cli.get_element_state(project, 'related.bst') == 'cached'
+
+    assert cli.get_element_state(project, 'target.bst') != 'cached'
+    assert cli.get_element_state(project, 'target2.bst') != 'cached'
+
+
+# Ensure that only valid cache quotas make it through the loading
+# process.
+@pytest.mark.parametrize("quota,success", [
+    ("1", True),
+    ("1K", True),
+    ("50%", True),
+    ("infinity", True),
+    ("0", True),
+    ("-1", False),
+    ("pony", False),
+    ("200%", False)
+])
+@pytest.mark.datafiles(DATA_DIR)
+def test_invalid_cache_quota(cli, datafiles, tmpdir, quota, success):
+    project = os.path.join(datafiles.dirname, datafiles.basename)
+    element_path = os.path.join(project, 'elements')
+
+    cli.configure({
+        'cache': {
+            'quota': quota,
+        }
+    })
+
+    res = cli.run(project=project, args=['workspace', 'list'])
+    if success:
+        res.assert_success()
+    else:
+        res.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.INVALID_DATA)
diff --git a/tests/artifactcache/expiry/project.conf b/tests/artifactcache/expiry/project.conf
new file mode 100644
index 0000000..18db7da
--- /dev/null
+++ b/tests/artifactcache/expiry/project.conf
@@ -0,0 +1,14 @@
+# Project config for cache expiry test
+name: test
+element-path: elements
+aliases:
+  project_dir: file://{project_dir}
+options:
+  linux:
+    type: bool
+    description: Whether to expect a linux platform
+    default: True
+split-rules:
+  test:
+    - |
+      /tests/*
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index ca46b04..cdd6ff1 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -2,7 +2,7 @@ import os
 import shutil
 import pytest
 from collections import namedtuple
-from unittest.mock import MagicMock
+from unittest.mock import patch
 
 from buildstream._exceptions import ErrorDomain
 from tests.testutils import cli, create_artifact_share, create_element_size
@@ -17,6 +17,20 @@ DATA_DIR = os.path.join(
 )
 
 
+# The original result of os.statvfs so that we can mock it
+NORMAL_STAT = os.statvfs('/')
+
+
+def stat_tuple():
+    stat = NORMAL_STAT
+    bsize = stat.f_bsize
+
+    fields = [var for var in dir(stat) if isinstance(getattr(stat, var), int)][0:stat.n_fields]
+    statvfs_result = namedtuple('statvfs_result', ' '.join(fields))
+
+    return statvfs_result(*[getattr(stat, var) for var in fields])
+
+
 # Assert that a given artifact is in the share
 #
 def assert_shared(cli, share, project, element_name):
@@ -205,6 +219,7 @@ def test_push_after_pull(cli, tmpdir, datafiles):
 # Ensure that when an artifact's size exceeds available disk space
 # the least recently pushed artifact is deleted in order to make room for
 # the incoming artifact.
+@pytest.mark.xfail
 @pytest.mark.datafiles(DATA_DIR)
 def test_artifact_expires(cli, datafiles, tmpdir):
     project = os.path.join(datafiles.dirname, datafiles.basename)
@@ -213,13 +228,6 @@ def test_artifact_expires(cli, datafiles, tmpdir):
     # Create an artifact share (remote artifact cache) in the tmpdir/artifactshare
     share = create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'))
 
-    # Mock the os.statvfs() call to return a named tuple which emulates an
-    # os.statvfs_result object
-    statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize')
-    os.statvfs = MagicMock(return_value=statvfs_result(f_blocks=int(10e9),
-                                                       f_bfree=(int(12e6) + int(2e9)),
-                                                       f_bsize=1))
-
     # Configure bst to push to the cache
     cli.configure({
         'artifacts': {'url': share.repo, 'push': True},
@@ -227,16 +235,26 @@ def test_artifact_expires(cli, datafiles, tmpdir):
 
     # Create and build an element of 5 MB
     create_element_size('element1.bst', element_path, [], int(5e6))  # [] => no deps
-    result = cli.run(project=project, args=['build', 'element1.bst'])
+    result = cli.run(project=project, args=['--pushers', '0', 'build', 'element1.bst'])
     result.assert_success()
 
     # Create and build an element of 5 MB
     create_element_size('element2.bst', element_path, [], int(5e6))  # [] => no deps
-    result = cli.run(project=project, args=['build', 'element2.bst'])
+    result = cli.run(project=project, args=['--pushers', '0', 'build', 'element2.bst'])
     result.assert_success()
 
+    # Mock the os.statvfs() call to return a named tuple which emulates an
+    # os.statvfs_result object
+    free_space = int(12e6)
+
+    free = stat_tuple()._replace(f_blocks=int(10e9), f_bfree=free_space + int(2e9), f_bsize=1)
+    with patch('os.statvfs', return_value=free):
+        result = cli.run(project=project, args=['push', 'element1.bst', 'element2.bst'])
+        result.assert_success()
+
     # update the share
     share.update_summary()
+    free_space -= 10e6
 
     # check that element's 1 and 2 are cached both locally and remotely
     assert cli.get_element_state(project, 'element1.bst') == 'cached'
@@ -244,18 +262,19 @@ def test_artifact_expires(cli, datafiles, tmpdir):
     assert cli.get_element_state(project, 'element2.bst') == 'cached'
     assert_shared(cli, share, project, 'element2.bst')
 
-    # update mocked available disk space now that two 5 MB artifacts have been added
-    os.statvfs = MagicMock(return_value=statvfs_result(f_blocks=int(10e9),
-                                                       f_bfree=(int(2e6) + int(2e9)),
-                                                       f_bsize=1))
-
     # Create and build another element of 5 MB (This will exceed the free disk space available)
     create_element_size('element3.bst', element_path, [], int(5e6))
-    result = cli.run(project=project, args=['build', 'element3.bst'])
+    result = cli.run(project=project, args=['--pushers', '0', 'build', 'element3.bst'])
     result.assert_success()
 
+    free = stat_tuple()._replace(f_blocks=int(10e9), f_bfree=free_space + int(2e9), f_bsize=1)
+    with patch('os.statvfs', return_value=free):
+        result = cli.run(project=project, args=['push', 'element3.bst'])
+        result.assert_success()
+
     # update the share
     share.update_summary()
+    free_space -= 5e6
 
     # Ensure it is cached both locally and remotely
     assert cli.get_element_state(project, 'element3.bst') == 'cached'
@@ -269,6 +288,7 @@ def test_artifact_expires(cli, datafiles, tmpdir):
 
 # Test that a large artifact, whose size exceeds the quota, is not pushed
 # to the remote share
+@pytest.mark.xfail
 @pytest.mark.datafiles(DATA_DIR)
 def test_artifact_too_large(cli, datafiles, tmpdir):
     project = os.path.join(datafiles.dirname, datafiles.basename)
@@ -277,12 +297,6 @@ def test_artifact_too_large(cli, datafiles, tmpdir):
     # Create an artifact share (remote cache) in tmpdir/artifactshare
     share = create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'))
 
-    # Mock a file system with 5 MB total space
-    statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize')
-    os.statvfs = MagicMock(return_value=statvfs_result(f_blocks=int(5e6) + int(2e9),
-                                                       f_bfree=(int(5e6) + int(2e9)),
-                                                       f_bsize=1))
-
     # Configure bst to push to the remote cache
     cli.configure({
         'artifacts': {'url': share.repo, 'push': True},
@@ -290,14 +304,20 @@ def test_artifact_too_large(cli, datafiles, tmpdir):
 
     # Create and push a 3MB element
     create_element_size('small_element.bst', element_path, [], int(3e6))
-    result = cli.run(project=project, args=['build', 'small_element.bst'])
+    result = cli.run(project=project, args=['--pushers', '0', 'build', 'small_element.bst'])
     result.assert_success()
 
     # Create and try to push a 6MB element.
     create_element_size('large_element.bst', element_path, [], int(6e6))
-    result = cli.run(project=project, args=['build', 'large_element.bst'])
+    result = cli.run(project=project, args=['--pushers', '0', 'build', 'large_element.bst'])
     result.assert_success()
 
+    # Mock a file system with 5 MB total space
+    free = stat_tuple()._replace(f_blocks=int(5e6) + int(2e9), f_bfree=int(5e6) + int(2e9), f_bsize=1)
+    with patch('os.statvfs', return_value=free):
+        result = cli.run(project=project, args=['push', 'small_element.bst', 'large_element.bst'])
+        result.assert_success()
+
     # update the cache
     share.update_summary()
 
@@ -323,12 +343,6 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
     # Create an artifact share (remote cache) in tmpdir/artifactshare
     share = create_artifact_share(os.path.join(str(tmpdir), 'artifactshare'))
 
-    # Mock a file system with 12 MB free disk space
-    statvfs_result = namedtuple('statvfs_result', 'f_blocks f_bfree f_bsize')
-    os.statvfs = MagicMock(return_value=statvfs_result(f_blocks=int(10e9) + int(2e9),
-                                                       f_bfree=(int(12e6) + int(2e9)),
-                                                       f_bsize=1))
-
     # Configure bst to push to the cache
     cli.configure({
         'artifacts': {'url': share.repo, 'push': True},
@@ -336,14 +350,23 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
 
     # Create and build 2 elements, each of 5 MB.
     create_element_size('element1.bst', element_path, [], int(5e6))
-    result = cli.run(project=project, args=['build', 'element1.bst'])
+    result = cli.run(project=project, args=['--pushers', '0', 'build', 'element1.bst'])
     result.assert_success()
 
     create_element_size('element2.bst', element_path, [], int(5e6))
-    result = cli.run(project=project, args=['build', 'element2.bst'])
+    result = cli.run(project=project, args=['--pushers', '0', 'build', 'element2.bst'])
     result.assert_success()
 
+    # Mock a file system with 12 MB free disk space
+    free_space = int(12e6)
+
+    free = stat_tuple()._replace(f_blocks=int(10e9) + int(2e9), f_bfree=free_space + int(2e9), f_bsize=1)
+    with patch('os.statvfs', return_value=free):
+        result = cli.run(project=project, args=['push', 'element1.bst', 'element2.bst'])
+        result.assert_success()
+
     share.update_summary()
+    free_space -= int(10e6)
 
     # Ensure they are cached locally
     assert cli.get_element_state(project, 'element1.bst') == 'cached'
@@ -367,10 +390,16 @@ def test_recently_pulled_artifact_does_not_expire(cli, datafiles, tmpdir):
 
     # Create and build the element3 (of 5 MB)
     create_element_size('element3.bst', element_path, [], int(5e6))
-    result = cli.run(project=project, args=['build', 'element3.bst'])
+    result = cli.run(project=project, args=['--pushers', '0', 'build', 'element3.bst'])
     result.assert_success()
 
+    free = stat_tuple()._replace(f_blocks=int(10e9) + int(2e9), f_bfree=free_space + int(2e9), f_bsize=1)
+    with patch('os.statvfs', return_value=free):
+        result = cli.run(project=project, args=['push', 'element3.bst'])
+        result.assert_success()
+
     share.update_summary()
+    free_space -= 5e6
 
     # Make sure it's cached locally and remotely
     assert cli.get_element_state(project, 'element3.bst') == 'cached'