You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:21:10 UTC

[buildstream] branch phil/712-high-priority-job-queue created (now 57236af)

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a change to branch phil/712-high-priority-job-queue
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at 57236af  WIP: scheduler.py: Add a second high priority queue

This branch includes the following new commits:

     new 6996a43  contributing: add guidance on unit tests
     new da61230  Add cli main and user config option for 'pull-buildtrees' context.
     new 32d6e68  Don't pull artifact build trees by default.
     new ff2eece  plugins/sources/pip.py: also look for python version named "python"
     new 57236af  WIP: scheduler.py: Add a second high priority queue

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 04/05: plugins/sources/pip.py: also look for python version named "python"

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch phil/712-high-priority-job-queue
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit ff2eece3ca739a1d2244a4a3bf07b1b488089e67
Author: Benjamin Schubert <be...@gmail.com>
AuthorDate: Thu Nov 8 11:12:27 2018 +0000

    plugins/sources/pip.py: also look for python version named "python"
    
    In some virtual environments, we might not have the binary with the
    exact python version, which would lead the current pip plugin to
    fail to find a working pip version.
    
    This adds "python" at the start of the list of valid python versions
---
 buildstream/plugins/sources/pip.py | 1 +
 1 file changed, 1 insertion(+)

diff --git a/buildstream/plugins/sources/pip.py b/buildstream/plugins/sources/pip.py
index 2ef4016..df2eeb5 100644
--- a/buildstream/plugins/sources/pip.py
+++ b/buildstream/plugins/sources/pip.py
@@ -80,6 +80,7 @@ _PYPI_INDEX_URL = 'https://pypi.org/simple/'
 
 # Used only for finding pip command
 _PYTHON_VERSIONS = [
+    'python',  # when running in a venv, we might not have the exact version
     'python2.7',
     'python3.0',
     'python3.1',


[buildstream] 02/05: Add cli main and user config option for 'pull-buildtrees' context.

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch phil/712-high-priority-job-queue
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit da612300d1b58311be64798fe8fc6e35a004407b
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Nov 5 13:16:53 2018 +0000

    Add cli main and user config option for 'pull-buildtrees' context.
    
    _context.py: Add pull_buildtrees global user context, the default
    of which is set to False via the addition of pull-buildtrees to
    userconfig.yaml cache group.
    
    _frontend/app.py & cli.py: Add --pull-buildtrees as a bst main
    option, which when passed will override the default or user defined
    context for pull_buildtrees.
    
    tests/completions/completions.py: Update for the added flag.
---
 buildstream/_context.py          | 8 +++++++-
 buildstream/_frontend/app.py     | 3 ++-
 buildstream/_frontend/cli.py     | 2 ++
 buildstream/data/userconfig.yaml | 3 +++
 tests/completions/completions.py | 1 +
 5 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/buildstream/_context.py b/buildstream/_context.py
index 876b747..960f371 100644
--- a/buildstream/_context.py
+++ b/buildstream/_context.py
@@ -104,6 +104,9 @@ class Context():
         # What to do when a build fails in non interactive mode
         self.sched_error_action = 'continue'
 
+        # Whether or not to attempt to pull build trees globally
+        self.pull_buildtrees = None
+
         # Whether elements must be rebuilt when their dependencies have changed
         self._strict_build_plan = None
 
@@ -178,13 +181,16 @@ class Context():
         # our artifactdir - the artifactdir may not have been created
         # yet.
         cache = _yaml.node_get(defaults, Mapping, 'cache')
-        _yaml.node_validate(cache, ['quota'])
+        _yaml.node_validate(cache, ['quota', 'pull-buildtrees'])
 
         self.config_cache_quota = _yaml.node_get(cache, str, 'quota', default_value='infinity')
 
         # Load artifact share configuration
         self.artifact_cache_specs = ArtifactCache.specs_from_config_node(defaults)
 
+        # Load pull build trees configuration
+        self.pull_buildtrees = _yaml.node_get(cache, bool, 'pull-buildtrees')
+
         # Load logging config
         logging = _yaml.node_get(defaults, Mapping, 'logging')
         _yaml.node_validate(logging, [
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 6b292a4..b42b94b 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -182,7 +182,8 @@ class App():
             'fetchers': 'sched_fetchers',
             'builders': 'sched_builders',
             'pushers': 'sched_pushers',
-            'network_retries': 'sched_network_retries'
+            'network_retries': 'sched_network_retries',
+            'pull_buildtrees': 'pull_buildtrees'
         }
         for cli_option, context_attr in override_map.items():
             option_value = self._main_options.get(cli_option)
diff --git a/buildstream/_frontend/cli.py b/buildstream/_frontend/cli.py
index 8563295..b75bf45 100644
--- a/buildstream/_frontend/cli.py
+++ b/buildstream/_frontend/cli.py
@@ -219,6 +219,8 @@ def print_version(ctx, param, value):
               help="Specify a project option")
 @click.option('--default-mirror', default=None,
               help="The mirror to fetch from first, before attempting other mirrors")
+@click.option('--pull-buildtrees', is_flag=True, default=None,
+              help="Include an element's build tree when pulling remote element artifacts")
 @click.pass_context
 def cli(context, **kwargs):
     """Build and manipulate BuildStream projects
diff --git a/buildstream/data/userconfig.yaml b/buildstream/data/userconfig.yaml
index efe419c..f540a97 100644
--- a/buildstream/data/userconfig.yaml
+++ b/buildstream/data/userconfig.yaml
@@ -35,6 +35,9 @@ cache:
   # to the isize of the file system containing the cache.
   quota: infinity
 
+  # Whether to pull build trees when downloading element artifacts
+  pull-buildtrees: False
+
 #
 #    Scheduler
 #
diff --git a/tests/completions/completions.py b/tests/completions/completions.py
index e6d15e6..8372874 100644
--- a/tests/completions/completions.py
+++ b/tests/completions/completions.py
@@ -42,6 +42,7 @@ MAIN_OPTIONS = [
     "-o ",
     "--option ",
     "--on-error ",
+    "--pull-buildtrees ",
     "--pushers ",
     "--strict ",
     "--verbose ",


[buildstream] 03/05: Don't pull artifact build trees by default.

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch phil/712-high-priority-job-queue
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 32d6e68ff8d8a56f1d0418b7fa0aed1ef64fd88a
Author: Tom Pollard <to...@codethink.co.uk>
AuthorDate: Mon Nov 5 13:32:29 2018 +0000

    Don't pull artifact build trees by default.
    
    The addition of cached build trees being included in element
    artifacts has led to mostly redundant download overheads when
    pulling from a remote artifact server. As such the default behaviour
    of pull shouldn't fetch the build tree object if available.
    
    element.py: extend relevant pull logic for specified subdir
    consideration and ensure push logic does not lead to partial
    artifact pushes. Change assumption that the buildtree can be
    extracted if artifact is cached. __cached_buildtree() and
    __pull_directories() helpers added.
    
    _artifactcache/: artifactcache.py & cascache.py inclusion of
    helper functions for subdir artifact checking & fetching, fetch
    logic extended to only pull required artifact directories.
    extract & checkout updated to handle full/partial operation.
    
    tests/: addition of integration test pullbuildtrees.py,
    buildtrees.py adapted cli options, testutils/artifactshare.py
    has_artifact changed to return artifact digest on true condition.
---
 NEWS                                        |  14 ++-
 buildstream/_artifactcache/artifactcache.py |  32 +++++-
 buildstream/_artifactcache/cascache.py      |  73 ++++++++++---
 buildstream/element.py                      | 162 +++++++++++++++++++++-------
 tests/integration/build-tree.py             |   4 +-
 tests/integration/pullbuildtrees.py         | 147 +++++++++++++++++++++++++
 tests/testutils/artifactshare.py            |   6 +-
 7 files changed, 378 insertions(+), 60 deletions(-)

diff --git a/NEWS b/NEWS
index 589a2a2..632286f 100644
--- a/NEWS
+++ b/NEWS
@@ -38,13 +38,23 @@ buildstream 1.3.1
     a bug fix to workspaces so they can be build in workspaces too.
 
   o Creating a build shell through the interactive mode or `bst shell --build`
-    will now use the cached build tree. It is now easier to debug local build
-    failures.
+    will now use the cached build tree if available locally. It is now easier to
+    debug local build failures.
 
   o `bst shell --sysroot` now takes any directory that contains a sysroot,
     instead of just a specially-formatted build-root with a `root` and `scratch`
     subdirectory.
 
+  o Due to the element `build tree` being cached in the respective artifact their
+    size in some cases has significantly increased. In *most* cases the build trees
+    are not utilised when building targets, as such by default bst 'pull' & 'build'
+    will not fetch build trees from remotes. This behaviour can be overridden with
+    the cli main option '--pull-buildtrees', or the user configuration cache group
+    option 'pull-buildtrees = True'. The override will also add the build tree to
+    already cached artifacts. When attempting to populate an artifactcache server
+    with cached artifacts, only 'complete' elements can be pushed. If the element
+    is expected to have a populated build tree then it must be cached before pushing.
+
 
 =================
 buildstream 1.1.5
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index b032446..7080f21 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -476,6 +476,22 @@ class ArtifactCache():
 
         return self.cas.contains(ref)
 
+    # contains_subdir_artifact():
+    #
+    # Check whether an artifact element contains a digest for a subdir
+    # which is populated in the cache, i.e non dangling.
+    #
+    # Args:
+    #     element (Element): The Element to check
+    #     key (str): The cache key to use
+    #     subdir (str): The subdir to check
+    #
+    # Returns: True if the subdir exists & is populated in the cache, False otherwise
+    #
+    def contains_subdir_artifact(self, element, key, subdir):
+        ref = self.get_artifact_fullname(element, key)
+        return self.cas.contains_subdir_artifact(ref, subdir)
+
     # list_artifacts():
     #
     # List artifacts in this cache in LRU order.
@@ -533,6 +549,7 @@ class ArtifactCache():
     # Args:
     #     element (Element): The Element to extract
     #     key (str): The cache key to use
+    #     subdir (str): Optional specific subdir to extract
     #
     # Raises:
     #     ArtifactError: In cases there was an OSError, or if the artifact
@@ -540,12 +557,12 @@ class ArtifactCache():
     #
     # Returns: path to extracted artifact
     #
-    def extract(self, element, key):
+    def extract(self, element, key, subdir=None):
         ref = self.get_artifact_fullname(element, key)
 
         path = os.path.join(self.extractdir, element._get_project().name, element.normal_name)
 
-        return self.cas.extract(ref, path)
+        return self.cas.extract(ref, path, subdir=subdir)
 
     # commit():
     #
@@ -666,11 +683,13 @@ class ArtifactCache():
     #     element (Element): The Element whose artifact is to be fetched
     #     key (str): The cache key to use
     #     progress (callable): The progress callback, if any
+    #     subdir (str): The optional specific subdir to pull
+    #     excluded_subdirs (list): The optional list of subdirs to not pull
     #
     # Returns:
     #   (bool): True if pull was successful, False if artifact was not available
     #
-    def pull(self, element, key, *, progress=None):
+    def pull(self, element, key, *, progress=None, subdir=None, excluded_subdirs=None):
         ref = self.get_artifact_fullname(element, key)
 
         project = element._get_project()
@@ -680,8 +699,13 @@ class ArtifactCache():
                 display_key = element._get_brief_display_key()
                 element.status("Pulling artifact {} <- {}".format(display_key, remote.spec.url))
 
-                if self.cas.pull(ref, remote, progress=progress):
+                if self.cas.pull(ref, remote, progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs):
                     element.info("Pulled artifact {} <- {}".format(display_key, remote.spec.url))
+                    if subdir:
+                        # Attempt to extract subdir into artifact extract dir if it already exists
+                        # without containing the subdir. If the respective artifact extract dir does not
+                        # exist a complete extraction will complete.
+                        self.extract(element, key, subdir)
                     # no need to pull from additional remotes
                     return True
                 else:
diff --git a/buildstream/_artifactcache/cascache.py b/buildstream/_artifactcache/cascache.py
index b6e26ec..f07bd24 100644
--- a/buildstream/_artifactcache/cascache.py
+++ b/buildstream/_artifactcache/cascache.py
@@ -82,6 +82,27 @@ class CASCache():
         # This assumes that the repository doesn't have any dangling pointers
         return os.path.exists(refpath)
 
+    # contains_subdir_artifact():
+    #
+    # Check whether the specified artifact element tree has a digest for a subdir
+    # which is populated in the cache, i.e non dangling.
+    #
+    # Args:
+    #     ref (str): The ref to check
+    #     subdir (str): The subdir to check
+    #
+    # Returns: True if the subdir exists & is populated in the cache, False otherwise
+    #
+    def contains_subdir_artifact(self, ref, subdir):
+        tree = self.resolve_ref(ref)
+
+        # This assumes that the subdir digest is present in the element tree
+        subdirdigest = self._get_subdir(tree, subdir)
+        objpath = self.objpath(subdirdigest)
+
+        # True if subdir content is cached or if empty as expected
+        return os.path.exists(objpath)
+
     # extract():
     #
     # Extract cached directory for the specified ref if it hasn't
@@ -90,19 +111,30 @@ class CASCache():
     # Args:
     #     ref (str): The ref whose directory to extract
     #     path (str): The destination path
+    #     subdir (str): Optional specific dir to extract
     #
     # Raises:
     #     CASError: In cases there was an OSError, or if the ref did not exist.
     #
     # Returns: path to extracted directory
     #
-    def extract(self, ref, path):
+    def extract(self, ref, path, subdir=None):
         tree = self.resolve_ref(ref, update_mtime=True)
 
-        dest = os.path.join(path, tree.hash)
+        originaldest = dest = os.path.join(path, tree.hash)
+
+        # If artifact is already extracted, check if the optional subdir
+        # has also been extracted. If the artifact has not been extracted
+        # a full extraction would include the optional subdir
         if os.path.isdir(dest):
-            # directory has already been extracted
-            return dest
+            if subdir:
+                if not os.path.isdir(os.path.join(dest, subdir)):
+                    dest = os.path.join(dest, subdir)
+                    tree = self._get_subdir(tree, subdir)
+                else:
+                    return dest
+            else:
+                return dest
 
         with tempfile.TemporaryDirectory(prefix='tmp', dir=self.tmpdir) as tmpdir:
             checkoutdir = os.path.join(tmpdir, ref)
@@ -120,7 +152,7 @@ class CASCache():
                 if e.errno not in [errno.ENOTEMPTY, errno.EEXIST]:
                     raise CASError("Failed to extract directory for ref '{}': {}".format(ref, e)) from e
 
-        return dest
+        return originaldest
 
     # commit():
     #
@@ -193,11 +225,13 @@ class CASCache():
     #     ref (str): The ref to pull
     #     remote (CASRemote): The remote repository to pull from
     #     progress (callable): The progress callback, if any
+    #     subdir (str): The optional specific subdir to pull
+    #     excluded_subdirs (list): The optional list of subdirs to not pull
     #
     # Returns:
     #   (bool): True if pull was successful, False if ref was not available
     #
-    def pull(self, ref, remote, *, progress=None):
+    def pull(self, ref, remote, *, progress=None, subdir=None, excluded_subdirs=None):
         try:
             remote.init()
 
@@ -209,7 +243,12 @@ class CASCache():
             tree.hash = response.digest.hash
             tree.size_bytes = response.digest.size_bytes
 
-            self._fetch_directory(remote, tree)
+            # Check if the element artifact is present, if so just fetch the subdir.
+            if subdir and os.path.exists(self.objpath(tree)):
+                self._fetch_subdir(remote, tree, subdir)
+            else:
+                # Fetch artifact, excluded_subdirs determined in pullqueue
+                self._fetch_directory(remote, tree, excluded_subdirs=excluded_subdirs)
 
             self.set_ref(ref, tree)
 
@@ -607,8 +646,10 @@ class CASCache():
                          stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
 
         for dirnode in directory.directories:
-            fullpath = os.path.join(dest, dirnode.name)
-            self._checkout(fullpath, dirnode.digest)
+            # Don't try to checkout a dangling ref
+            if os.path.exists(self.objpath(dirnode.digest)):
+                fullpath = os.path.join(dest, dirnode.name)
+                self._checkout(fullpath, dirnode.digest)
 
         for symlinknode in directory.symlinks:
             # symlink
@@ -863,11 +904,14 @@ class CASCache():
     # Args:
     #     remote (Remote): The remote to use.
     #     dir_digest (Digest): Digest object for the directory to fetch.
+    #     excluded_subdirs (list): The optional list of subdirs to not fetch
     #
-    def _fetch_directory(self, remote, dir_digest):
+    def _fetch_directory(self, remote, dir_digest, *, excluded_subdirs=None):
         fetch_queue = [dir_digest]
         fetch_next_queue = []
         batch = _CASBatchRead(remote)
+        if not excluded_subdirs:
+            excluded_subdirs = []
 
         while len(fetch_queue) + len(fetch_next_queue) > 0:
             if not fetch_queue:
@@ -882,8 +926,9 @@ class CASCache():
                 directory.ParseFromString(f.read())
 
             for dirnode in directory.directories:
-                batch = self._fetch_directory_node(remote, dirnode.digest, batch,
-                                                   fetch_queue, fetch_next_queue, recursive=True)
+                if dirnode.name not in excluded_subdirs:
+                    batch = self._fetch_directory_node(remote, dirnode.digest, batch,
+                                                       fetch_queue, fetch_next_queue, recursive=True)
 
             for filenode in directory.files:
                 batch = self._fetch_directory_node(remote, filenode.digest, batch,
@@ -892,6 +937,10 @@ class CASCache():
         # Fetch final batch
         self._fetch_directory_batch(remote, batch, fetch_queue, fetch_next_queue)
 
+    def _fetch_subdir(self, remote, tree, subdir):
+        subdirdigest = self._get_subdir(tree, subdir)
+        self._fetch_directory(remote, subdirdigest)
+
     def _fetch_tree(self, remote, digest):
         # download but do not store the Tree object
         with tempfile.NamedTemporaryFile(dir=self.tmpdir) as out:
diff --git a/buildstream/element.py b/buildstream/element.py
index 31ca5d3..41ffdd1 100644
--- a/buildstream/element.py
+++ b/buildstream/element.py
@@ -1397,12 +1397,12 @@ class Element(Plugin):
                         with self.timed_activity("Staging local files at {}"
                                                  .format(workspace.get_absolute_path())):
                             workspace.stage(temp_staging_directory)
-                elif self._cached():
-                    # We have a cached buildtree to use, instead
+                # Check if we have a cached buildtree to use
+                elif self.__cached_buildtree():
                     artifact_base, _ = self.__extract()
                     import_dir = os.path.join(artifact_base, 'buildtree')
                 else:
-                    # No workspace, stage directly
+                    # No workspace or cached buildtree, stage source directly
                     for source in self.sources():
                         source._stage(temp_staging_directory)
 
@@ -1691,7 +1691,9 @@ class Element(Plugin):
 
     # _pull_pending()
     #
-    # Check whether the artifact will be pulled.
+    # Check whether the artifact will be pulled. If the pull operation is to
+    # include a specific subdir of the element artifact (from cli or user conf)
+    # then the local cache is queried for the subdirs existence.
     #
     # Returns:
     #   (bool): Whether a pull operation is pending
@@ -1701,8 +1703,15 @@ class Element(Plugin):
             # Workspace builds are never pushed to artifact servers
             return False
 
-        if self.__strong_cached:
-            # Artifact already in local cache
+        # Check whether the pull has been invoked with a specific subdir requested
+        # in user context, as to complete a partial artifact
+        subdir, _ = self.__pull_directories()
+
+        if self.__strong_cached and subdir:
+            # If we've specified a subdir, check if the subdir is cached locally
+            if self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, subdir):
+                return False
+        elif self.__strong_cached:
             return False
 
         # Pull is pending if artifact remote server available
@@ -1724,33 +1733,6 @@ class Element(Plugin):
 
         self._update_state()
 
-    def _pull_strong(self, *, progress=None):
-        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
-
-        key = self.__strict_cache_key
-        if not self.__artifacts.pull(self, key, progress=progress):
-            return False
-
-        # update weak ref by pointing it to this newly fetched artifact
-        self.__artifacts.link_key(self, key, weak_key)
-
-        return True
-
-    def _pull_weak(self, *, progress=None):
-        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
-
-        if not self.__artifacts.pull(self, weak_key, progress=progress):
-            return False
-
-        # extract strong cache key from this newly fetched artifact
-        self._pull_done()
-
-        # create tag for strong cache key
-        key = self._get_cache_key(strength=_KeyStrength.STRONG)
-        self.__artifacts.link_key(self, weak_key, key)
-
-        return True
-
     # _pull():
     #
     # Pull artifact from remote artifact repository into local artifact cache.
@@ -1763,11 +1745,15 @@ class Element(Plugin):
         def progress(percent, message):
             self.status(message)
 
+        # Get optional specific subdir to pull and optional list to not pull
+        # based off of user context
+        subdir, excluded_subdirs = self.__pull_directories()
+
         # Attempt to pull artifact without knowing whether it's available
-        pulled = self._pull_strong(progress=progress)
+        pulled = self.__pull_strong(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
 
         if not pulled and not self._cached() and not context.get_strict():
-            pulled = self._pull_weak(progress=progress)
+            pulled = self.__pull_weak(progress=progress, subdir=subdir, excluded_subdirs=excluded_subdirs)
 
         if not pulled:
             return False
@@ -1787,10 +1773,12 @@ class Element(Plugin):
             # No push remotes for this element's project
             return True
 
-        if not self._cached():
+        # Do not push elements that aren't cached, or that are cached with a dangling buildtree
+        # artifact unless element type is expected to have an an empty buildtree directory
+        if not self.__cached_buildtree():
             return True
 
-        # Do not push tained artifact
+        # Do not push tainted artifact
         if self.__get_tainted():
             return True
 
@@ -2674,6 +2662,106 @@ class Element(Plugin):
 
         return utils._deduplicate(keys)
 
+    # __pull_strong():
+    #
+    # Attempt pulling given element from configured artifact caches with
+    # the strict cache key
+    #
+    # Args:
+    #     progress (callable): The progress callback, if any
+    #     subdir (str): The optional specific subdir to pull
+    #     excluded_subdirs (list): The optional list of subdirs to not pull
+    #
+    # Returns:
+    #     (bool): Whether or not the pull was successful
+    #
+    def __pull_strong(self, *, progress=None, subdir=None, excluded_subdirs=None):
+        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
+        key = self.__strict_cache_key
+        if not self.__artifacts.pull(self, key, progress=progress, subdir=subdir,
+                                     excluded_subdirs=excluded_subdirs):
+            return False
+
+        # update weak ref by pointing it to this newly fetched artifact
+        self.__artifacts.link_key(self, key, weak_key)
+
+        return True
+
+    # __pull_weak():
+    #
+    # Attempt pulling given element from configured artifact caches with
+    # the weak cache key
+    #
+    # Args:
+    #     progress (callable): The progress callback, if any
+    #     subdir (str): The optional specific subdir to pull
+    #     excluded_subdirs (list): The optional list of subdirs to not pull
+    #
+    # Returns:
+    #     (bool): Whether or not the pull was successful
+    #
+    def __pull_weak(self, *, progress=None, subdir=None, excluded_subdirs=None):
+        weak_key = self._get_cache_key(strength=_KeyStrength.WEAK)
+        if not self.__artifacts.pull(self, weak_key, progress=progress, subdir=subdir,
+                                     excluded_subdirs=excluded_subdirs):
+            return False
+
+        # extract strong cache key from this newly fetched artifact
+        self._pull_done()
+
+        # create tag for strong cache key
+        key = self._get_cache_key(strength=_KeyStrength.STRONG)
+        self.__artifacts.link_key(self, weak_key, key)
+
+        return True
+
+    # __cached_buildtree():
+    #
+    # Check if cached element artifact contains expected buildtree
+    #
+    # Returns:
+    #     (bool): True if artifact cached with buildtree, False if
+    #             element not cached or missing expected buildtree
+    #
+    def __cached_buildtree(self):
+        context = self._get_context()
+
+        if not self._cached():
+            return False
+        elif context.get_strict():
+            if not self.__artifacts.contains_subdir_artifact(self, self.__strict_cache_key, 'buildtree'):
+                return False
+        elif not self.__artifacts.contains_subdir_artifact(self, self.__weak_cache_key, 'buildtree'):
+            return False
+
+        return True
+
+    # __pull_directories():
+    #
+    # Which directories to include or exclude given the current
+    # context
+    #
+    # Returns:
+    #     subdir (str): The optional specific subdir to include, based
+    #                   on user context
+    #     excluded_subdirs (list): The optional list of subdirs to not
+    #                              pull, referenced against subdir value
+    #
+    def __pull_directories(self):
+        context = self._get_context()
+
+        # Current default exclusions on pull
+        excluded_subdirs = ["buildtree"]
+        subdir = ''
+
+        # If buildtrees are to be pulled, remove the value from exclusion list
+        # and set specific subdir
+        if context.pull_buildtrees:
+            subdir = "buildtree"
+            excluded_subdirs.remove(subdir)
+
+        return (subdir, excluded_subdirs)
+
 
 def _overlap_error_detail(f, forbidden_overlap_elements, elements):
     if forbidden_overlap_elements:
diff --git a/tests/integration/build-tree.py b/tests/integration/build-tree.py
index df9006a..91a18a1 100644
--- a/tests/integration/build-tree.py
+++ b/tests/integration/build-tree.py
@@ -70,8 +70,8 @@ def test_buildtree_pulled(cli, tmpdir, datafiles):
         })
         assert cli.get_element_state(project, element_name) != 'cached'
 
-        # Pull from cache
-        result = cli.run(project=project, args=['pull', '--deps', 'all', element_name])
+        # Pull from cache, ensuring cli options is set to pull the buildtree
+        result = cli.run(project=project, args=['--pull-buildtrees', 'pull', '--deps', 'all', element_name])
         result.assert_success()
 
         # Check it's using the cached build tree
diff --git a/tests/integration/pullbuildtrees.py b/tests/integration/pullbuildtrees.py
new file mode 100644
index 0000000..94da21d
--- /dev/null
+++ b/tests/integration/pullbuildtrees.py
@@ -0,0 +1,147 @@
+import os
+import shutil
+import pytest
+
+from tests.testutils import cli_integration as cli, create_artifact_share
+from tests.testutils.integration import assert_contains
+from buildstream._exceptions import ErrorDomain, LoadErrorReason
+
+
+DATA_DIR = os.path.join(
+    os.path.dirname(os.path.realpath(__file__)),
+    "project"
+)
+
+
+# Remove artifact cache & set cli.config value of pull-buildtrees
+# to false, which is the default user context. The cache has to be
+# cleared as just forcefully removing the refpath leaves dangling objects.
+def default_state(cli, tmpdir, share):
+    shutil.rmtree(os.path.join(str(tmpdir), 'artifacts'))
+    cli.configure({
+        'artifacts': {'url': share.repo, 'push': False},
+        'artifactdir': os.path.join(str(tmpdir), 'artifacts'),
+        'cache': {'pull-buildtrees': False},
+    })
+
+
+# A test to capture the integration of the pullbuildtrees
+# behaviour, which by default is to not include the buildtree
+# directory of an element.
+@pytest.mark.integration
+@pytest.mark.datafiles(DATA_DIR)
+def test_pullbuildtrees(cli, tmpdir, datafiles, integration_cache):
+    project = os.path.join(datafiles.dirname, datafiles.basename)
+    element_name = 'autotools/amhello.bst'
+
+    # Create artifact shares for pull & push testing
+    with create_artifact_share(os.path.join(str(tmpdir), 'share1')) as share1,\
+        create_artifact_share(os.path.join(str(tmpdir), 'share2')) as share2:
+        cli.configure({
+            'artifacts': {'url': share1.repo, 'push': True},
+            'artifactdir': os.path.join(str(tmpdir), 'artifacts')
+        })
+
+        # Build autotools element, checked pushed, delete local
+        result = cli.run(project=project, args=['build', element_name])
+        assert result.exit_code == 0
+        assert cli.get_element_state(project, element_name) == 'cached'
+        assert share1.has_artifact('test', element_name, cli.get_element_key(project, element_name))
+        default_state(cli, tmpdir, share1)
+
+        # Pull artifact with default config, assert that pulling again
+        # doesn't create a pull job, then assert with buildtrees user
+        # config set creates a pull job.
+        result = cli.run(project=project, args=['pull', element_name])
+        assert element_name in result.get_pulled_elements()
+        result = cli.run(project=project, args=['pull', element_name])
+        assert element_name not in result.get_pulled_elements()
+        cli.configure({'cache': {'pull-buildtrees': True}})
+        result = cli.run(project=project, args=['pull', element_name])
+        assert element_name in result.get_pulled_elements()
+        default_state(cli, tmpdir, share1)
+
+        # Pull artifact with default config, then assert that pulling
+        # with buildtrees cli flag set creates a pull job.
+        # Also assert that the buildtree is added to the artifact's
+        # extract dir
+        result = cli.run(project=project, args=['pull', element_name])
+        assert element_name in result.get_pulled_elements()
+        elementdigest = share1.has_artifact('test', element_name, cli.get_element_key(project, element_name))
+        buildtreedir = os.path.join(str(tmpdir), 'artifacts', 'extract', 'test', 'autotools-amhello',
+                                    elementdigest.hash, 'buildtree')
+        assert not os.path.isdir(buildtreedir)
+        result = cli.run(project=project, args=['--pull-buildtrees', 'pull', element_name])
+        assert element_name in result.get_pulled_elements()
+        assert os.path.isdir(buildtreedir)
+        default_state(cli, tmpdir, share1)
+
+        # Pull artifact with pullbuildtrees set in user config, then assert
+        # that pulling with the same user config doesn't creates a pull job,
+        # or when buildtrees cli flag is set.
+        cli.configure({'cache': {'pull-buildtrees': True}})
+        result = cli.run(project=project, args=['pull', element_name])
+        assert element_name in result.get_pulled_elements()
+        result = cli.run(project=project, args=['pull', element_name])
+        assert element_name not in result.get_pulled_elements()
+        result = cli.run(project=project, args=['--pull-buildtrees', 'pull', element_name])
+        assert element_name not in result.get_pulled_elements()
+        default_state(cli, tmpdir, share1)
+
+        # Pull artifact with default config and buildtrees cli flag set, then assert
+        # that pulling with pullbuildtrees set in user config doesn't create a pull
+        # job.
+        result = cli.run(project=project, args=['--pull-buildtrees', 'pull', element_name])
+        assert element_name in result.get_pulled_elements()
+        cli.configure({'cache': {'pull-buildtrees': True}})
+        result = cli.run(project=project, args=['pull', element_name])
+        assert element_name not in result.get_pulled_elements()
+        default_state(cli, tmpdir, share1)
+
+        # Assert that a partial build element (not containing a populated buildtree dir)
+        # can't be pushed to an artifact share, then assert that a complete build element
+        # can be. This will attempt a partial pull from share1 and then a partial push
+        # to share2
+        result = cli.run(project=project, args=['pull', element_name])
+        assert element_name in result.get_pulled_elements()
+        cli.configure({'artifacts': {'url': share2.repo, 'push': True}})
+        result = cli.run(project=project, args=['push', element_name])
+        assert element_name not in result.get_pushed_elements()
+        assert not share2.has_artifact('test', element_name, cli.get_element_key(project, element_name))
+
+        # Assert that after pulling the missing buildtree the element artifact can be
+        # successfully pushed to the remote. This will attempt to pull the buildtree
+        # from share1 and then a 'complete' push to share2
+        cli.configure({'artifacts': {'url': share1.repo, 'push': False}})
+        result = cli.run(project=project, args=['--pull-buildtrees', 'pull', element_name])
+        assert element_name in result.get_pulled_elements()
+        cli.configure({'artifacts': {'url': share2.repo, 'push': True}})
+        result = cli.run(project=project, args=['push', element_name])
+        assert element_name in result.get_pushed_elements()
+        assert share2.has_artifact('test', element_name, cli.get_element_key(project, element_name))
+        default_state(cli, tmpdir, share1)
+
+
+# Ensure that only valid pull-buildtrees boolean options make it through the loading
+# process.
+@pytest.mark.parametrize("value,success", [
+    (True, True),
+    (False, True),
+    ("pony", False),
+    ("1", False)
+])
+@pytest.mark.datafiles(DATA_DIR)
+def test_invalid_cache_pullbuildtrees(cli, datafiles, tmpdir, value, success):
+    project = os.path.join(datafiles.dirname, datafiles.basename)
+
+    cli.configure({
+        'cache': {
+            'pull-buildtrees': value,
+        }
+    })
+
+    res = cli.run(project=project, args=['workspace', 'list'])
+    if success:
+        res.assert_success()
+    else:
+        res.assert_main_error(ErrorDomain.LOAD, LoadErrorReason.ILLEGAL_COMPOSITE)
diff --git a/tests/testutils/artifactshare.py b/tests/testutils/artifactshare.py
index 02f76de..91dcb9b 100644
--- a/tests/testutils/artifactshare.py
+++ b/tests/testutils/artifactshare.py
@@ -114,7 +114,7 @@ class ArtifactShare():
     #    cache_key (str): The cache key
     #
     # Returns:
-    #    (bool): True if the artifact exists in the share, otherwise false.
+    #    (str): artifact digest if the artifact exists in the share, otherwise None.
     def has_artifact(self, project_name, element_name, cache_key):
 
         # NOTE: This should be kept in line with our
@@ -134,9 +134,9 @@ class ArtifactShare():
 
         try:
             tree = self.cas.resolve_ref(artifact_key)
-            return True
+            return tree
         except CASError:
-            return False
+            return None
 
     # close():
     #


[buildstream] 05/05: WIP: scheduler.py: Add a second high priority queue

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch phil/712-high-priority-job-queue
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 57236af65c306ebbbec6d01f99593319b8cdeabf
Author: Phil Dawson <ph...@codethink.co.uk>
AuthorDate: Wed Nov 21 09:10:05 2018 +0000

    WIP: scheduler.py: Add a second high priority queue
    
    Adds a queue which allows "high priority" jobs to bypass the main waiting_jobs
    queue. This is then used to ensure that fetch jobs are prioritised over pull
    jobs.
---
 buildstream/_scheduler/queues/fetchqueue.py |  1 +
 buildstream/_scheduler/queues/queue.py      |  1 +
 buildstream/_scheduler/scheduler.py         | 58 +++++++++++++++++------------
 3 files changed, 37 insertions(+), 23 deletions(-)

diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index 446dbbd..5c441ba 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -33,6 +33,7 @@ class FetchQueue(Queue):
     action_name = "Fetch"
     complete_name = "Fetched"
     resources = [ResourceType.DOWNLOAD]
+    high_priority = True
 
     def __init__(self, scheduler, skip_cached=False):
         super().__init__(scheduler)
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 909cebb..3c04140 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -58,6 +58,7 @@ class Queue():
     action_name = None
     complete_name = None
     resources = []                     # Resources this queues' jobs want
+    high_priority = False              # If jobs from this queue should be prioritised by the scheduler
 
     def __init__(self, scheduler):
 
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index b76c730..1b05415 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -71,12 +71,13 @@ class Scheduler():
         #
         # Public members
         #
-        self.active_jobs = []       # Jobs currently being run in the scheduler
-        self.waiting_jobs = []      # Jobs waiting for resources
-        self.queues = None          # Exposed for the frontend to print summaries
-        self.context = context      # The Context object shared with Queues
-        self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
-        self.suspended = False      # Whether the scheduler is currently suspended
+        self.active_jobs = []           # Jobs currently being run in the scheduler
+        self.waiting_jobs = []          # Jobs waiting for resources
+        self.waiting_priority_jobs = [] # High priority jobs waiting for resources
+        self.queues = None              # Exposed for the frontend to print summaries
+        self.context = context          # The Context object shared with Queues
+        self.terminated = False         # Whether the scheduler was asked to terminate or has terminated
+        self.suspended = False          # Whether the scheduler is currently suspended
 
         # These are shared with the Job, but should probably be removed or made private in some way.
         self.loop = None            # Shared for Job access to observe the message queue
@@ -220,7 +221,9 @@ class Scheduler():
     # run as soon any other queueing jobs finish, provided sufficient
     # resources are available for them to run
     #
-    def schedule_jobs(self, jobs):
+    def schedule_jobs(self, jobs, priority_jobs):
+        for job in priority_jobs:
+            self.waiting_priority_jobs.append(job)
         for job in jobs:
             self.waiting_jobs.append(job)
 
@@ -257,7 +260,7 @@ class Scheduler():
                            resources=[ResourceType.CACHE,
                                       ResourceType.PROCESS],
                            complete_cb=self._run_cleanup)
-        self.schedule_jobs([job])
+        self.schedule_jobs([job], [])
 
     #######################################################
     #                  Local Private Methods              #
@@ -269,22 +272,27 @@ class Scheduler():
     # automatically when Scheduler.run() is called initially,
     #
     def _sched(self):
-        for job in self.waiting_jobs:
-            self._resources.reserve_exclusive_resources(job)
+        def allocate_resources_and_spawn_jobs(job_list):
+            for job in job_list:
+                self._resources.reserve_exclusive_resources(job)
+
+            for job in job_list:
+                if not self._resources.reserve_job_resources(job):
+                    continue
 
-        for job in self.waiting_jobs:
-            if not self._resources.reserve_job_resources(job):
-                continue
+                job.spawn()
+                job_list.remove(job)
+                self.active_jobs.append(job)
 
-            job.spawn()
-            self.waiting_jobs.remove(job)
-            self.active_jobs.append(job)
+                if self._job_start_callback:
+                    self._job_start_callback(job)
 
-            if self._job_start_callback:
-                self._job_start_callback(job)
+        # Process jobs from the high priority list first
+        allocate_resources_and_spawn_jobs(self.waiting_priority_jobs)
+        allocate_resources_and_spawn_jobs(self.waiting_jobs)
 
         # If nothings ticking, time to bail out
-        if not self.active_jobs and not self.waiting_jobs:
+        if not self.active_jobs and not self.waiting_jobs and not self.waiting_priority_jobs:
             self.loop.stop()
 
     # _schedule_queue_jobs()
@@ -298,6 +306,7 @@ class Scheduler():
     #
     def _schedule_queue_jobs(self):
         ready = []
+        ready_priority = []
         process_queues = True
 
         while self._queue_jobs and process_queues:
@@ -322,16 +331,19 @@ class Scheduler():
             # to fetch tasks for elements which failed to pull, and
             # thus need all the pulls to complete before ever starting
             # a build
-            ready.extend(chain.from_iterable(
-                queue.pop_ready_jobs() for queue in reversed(self.queues)
-            ))
+
+            for queue in reversed(self.queues):
+                if queue.high_priority:
+                    ready_priority.extend(queue.pop_ready_jobs())
+                else:
+                    ready.extend(queue.pop_ready_jobs())
 
             # pop_ready_jobs() may have skipped jobs, adding them to
             # the done_queue.  Pull these skipped elements forward to
             # the next queue and process them.
             process_queues = any(q.dequeue_ready() for q in self.queues)
 
-        self.schedule_jobs(ready)
+        self.schedule_jobs(ready, ready_priority)
         self._sched()
 
     # _run_cleanup()


[buildstream] 01/05: contributing: add guidance on unit tests

Posted by gi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch phil/712-high-priority-job-queue
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 6996a4344d2452d24042479b0b7b40f6946295e6
Author: Angelos Evripiotis <je...@bloomberg.net>
AuthorDate: Thu Nov 8 14:29:34 2018 +0000

    contributing: add guidance on unit tests
    
    Decrease uncertainty around whether unit tests are welcome in the
    project or not.
---
 CONTRIBUTING.rst | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst
index dcd21e0..da8bcab 100644
--- a/CONTRIBUTING.rst
+++ b/CONTRIBUTING.rst
@@ -1547,6 +1547,24 @@ Tests that run a sandbox should be decorated with::
 
 and use the integration cli helper.
 
+You should first aim to write tests that exercise your changes from the cli.
+This is so that the testing is end-to-end, and the changes are guaranteed to
+work for the end-user. The cli is considered stable, and so tests written in
+terms of it are unlikely to require updating as the internals of the software
+change over time.
+
+It may be impractical to sufficiently examine some changes this way. For
+example, the number of cases to test and the running time of each test may be
+too high. It may also be difficult to contrive circumstances to cover every
+line of the change. If this is the case, next you can consider also writing
+unit tests that work more directly on the changes.
+
+It is important to write unit tests in such a way that they do not break due to
+changes unrelated to what they are meant to test. For example, if the test
+relies on a lot of BuildStream internals, a large refactoring will likely
+require the test to be rewritten. Pure functions that only rely on the Python
+Standard Library are excellent candidates for unit testing.
+
 
 Measuring performance
 ---------------------