You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:49:34 UTC

[buildstream] branch juerg/cache-query-job-benchmark created (now a0637f4)

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a change to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at a0637f4  test

This branch includes the following new commits:

     new 847703c  tests/frontend/push.py: Allow pushing of dependencies
     new a2ef29b  tests/internals/pluginloading: Add missing get_ref() to FooSource
     new fb47c89  fetchqueue.py: Don't skip elements with a cached failure
     new afd578b  _pipeline.py: Drop the optimization for cached elements in the planner
     new fb36a74  _artifact.py: Make cache query explicit
     new 75df8f8  _elementsources.py: Make cache query explicit
     new 515eab7  element.py: Combine cache query and pull into `_load_artifact()`
     new ca909d9  Move artifact and source cache query to a job thread
     new c191ad8  Call _initialize_state() in Element._new_from_load_element()
     new 9136861  hack for benchmark
     new 9068295  pipe hack
     new a0637f4  test

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 02/12: tests/internals/pluginloading: Add missing get_ref() to FooSource

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit a2ef29b0c891e284db0f8007599920c0a71929af
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Thu Oct 8 17:07:29 2020 +0200

    tests/internals/pluginloading: Add missing get_ref() to FooSource
---
 tests/internals/pluginloading/customsource/pluginsources/foo.py | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/tests/internals/pluginloading/customsource/pluginsources/foo.py b/tests/internals/pluginloading/customsource/pluginsources/foo.py
index c5229f3..fd44a54 100644
--- a/tests/internals/pluginloading/customsource/pluginsources/foo.py
+++ b/tests/internals/pluginloading/customsource/pluginsources/foo.py
@@ -14,6 +14,9 @@ class FooSource(Source):
     def get_unique_key(self):
         pass
 
+    def get_ref(self):
+        pass
+
 
 def setup():
     return FooSource


[buildstream] 10/12: hack for benchmark

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 91368618e014b535b929a3e747610f941a1055cf
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Tue Dec 15 07:14:58 2020 +0100

    hack for benchmark
---
 src/buildstream/_scheduler/jobs/job.py | 21 +++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index aa71b6e..c7e2624 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -22,6 +22,7 @@
 
 # System imports
 import asyncio
+import contextlib
 import datetime
 import itertools
 import multiprocessing
@@ -495,13 +496,19 @@ class ChildJob:
         self._pipe_w = pipe_w
         self._messenger.set_message_handler(self._child_message_handler)
 
+        # FIXME
+        silence = self.action_name == "Cache-query"
+
         # Time, log and and run the action function
         #
-        with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages(
-            self._logfile, self._logdir
-        ) as filename:
+        if silence:
+            record_cm = contextlib.suppress()
+        else:
+            record_cm = self._messenger.recorded_messages(self._logfile, self._logdir)
+        with self._messenger.timed_suspendable() as timeinfo, record_cm as filename:
             try:
-                self.message(MessageType.START, self.action_name, logfile=filename)
+                if not silence:
+                    self.message(MessageType.START, self.action_name, logfile=filename)
 
                 with self._terminate_lock:
                     self._thread_id = threading.current_thread().ident
@@ -513,7 +520,8 @@ class ChildJob:
                     result = self.child_process()  # pylint: disable=assignment-from-no-return
                 except SkipJob as e:
                     elapsed = datetime.datetime.now() - timeinfo.start_time
-                    self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
+                    if not silence:
+                        self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
 
                     # Alert parent of skip by return code
                     return _ReturnCode.SKIPPED, None
@@ -560,7 +568,8 @@ class ChildJob:
                 else:
                     # No exception occurred in the action
                     elapsed = datetime.datetime.now() - timeinfo.start_time
-                    self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
+                    if not silence:
+                        self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
 
                     # Shutdown needs to stay outside of the above context manager,
                     # make sure we dont try to handle SIGTERM while the process


[buildstream] 03/12: fetchqueue.py: Don't skip elements with a cached failure

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit fb47c892f072fed1eb0870d5fc84ed88fec48276
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Thu Sep 17 17:44:50 2020 +0200

    fetchqueue.py: Don't skip elements with a cached failure
    
    The build queue requires the sources to be available for all elements
    where `_cached_success()` returns `False`. This includes elements with a
    cached failure.
---
 src/buildstream/_scheduler/queues/fetchqueue.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 18bf392..3a4183d 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -50,7 +50,7 @@ class FetchQueue(Queue):
             if not element._can_query_cache():
                 return QueueStatus.PENDING
 
-            if element._cached():
+            if element._cached_success():
                 return QueueStatus.SKIP
 
         # This will automatically skip elements which


[buildstream] 12/12: test

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit a0637f4172676d0b4b59377e79601807c2966c8a
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Tue Dec 22 18:14:16 2020 +0100

    test
---
 src/buildstream/_stream.py | 21 ++++++++++++++++-----
 1 file changed, 16 insertions(+), 5 deletions(-)

diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 6c4b2d9..72bc9f5 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -176,11 +176,22 @@ class Stream:
             # Enqueue complete build plan as this is required to determine `buildable` status.
             plan = list(self._pipeline.dependencies(elements, _Scope.ALL))
 
-            self._scheduler.clear_queues()
-            self._add_queue(CacheQueryQueue(self._scheduler, sources=sources), track=True)
-            self._enqueue_plan(plan)
-            self._run()
-            self._scheduler.clear_queues()
+            for element in plan:
+                if not sources and element._get_cache_key(strength=_KeyStrength.WEAK):
+                    element._load_artifact(pull=False)
+                    if not element._can_query_cache() or not element._cached_success():
+                        element._query_source_cache()
+                    if not element._pull_pending():
+                        element._load_artifact_done()
+                else:
+                        element._query_source_cache()
+
+            if False:
+                self._scheduler.clear_queues()
+                self._add_queue(CacheQueryQueue(self._scheduler, sources=sources), track=True)
+                self._enqueue_plan(plan)
+                self._run()
+                self._scheduler.clear_queues()
 
     # shell()
     #


[buildstream] 06/12: _elementsources.py: Make cache query explicit

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 75df8f827462a1e966be40dd5575941784baaa85
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Thu Sep 17 11:04:15 2020 +0200

    _elementsources.py: Make cache query explicit
    
    Cache query can be fairly expensive as it checks the presence of all
    blobs. Make this more explicit with a `query_cache()` method, instead of
    implicitly querying the cache on the first call to `cached()`.
---
 src/buildstream/_elementsources.py  | 31 ++++++++++++++++++++++++++-----
 src/buildstream/_frontend/widget.py |  2 ++
 src/buildstream/element.py          | 10 ++++++++++
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
index a15d20e..9b4afe4 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -293,7 +293,7 @@ class ElementSources:
         length = min(len(key), context.log_key_length)
         return key[:length]
 
-    # cached():
+    # query_cache():
     #
     # Check if the element sources are cached in CAS, generating the source
     # cache keys if needed.
@@ -301,10 +301,7 @@ class ElementSources:
     # Returns:
     #    (bool): True if the element sources are cached
     #
-    def cached(self):
-        if self._cached is not None:
-            return self._cached
-
+    def query_cache(self):
         cas = self._context.get_cascache()
         elementsourcescache = self._elementsourcescache
 
@@ -321,6 +318,28 @@ class ElementSources:
         self._cached = True
         return True
 
+    # can_query_cache():
+    #
+    # Returns whether the cache status is available.
+    #
+    # Returns:
+    #    (bool): True if cache status is available
+    #
+    def can_query_cache(self):
+        return self._cached is not None
+
+    # cached()
+    #
+    # Return whether the element sources are cached in CAS. This must be
+    # called only when all sources are resolved.
+    #
+    # Returns:
+    #    (bool): True if the element sources are cached
+    #
+    def cached(self):
+        assert self._cached is not None
+        return self._cached
+
     # is_resolved():
     #
     # Get whether all sources of the element are resolved
@@ -368,6 +387,8 @@ class ElementSources:
         unique_key = self.get_unique_key()
         self._cache_key = _cachekey.generate_key(unique_key)
 
+        self.query_cache()
+
     # preflight():
     #
     # A internal wrapper for calling the abstract preflight() method on
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index d289ef2..ad6c813 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -363,6 +363,8 @@ class LogLine(Widget):
                         line = p.fmt_subst(line, "state", "failed", fg="red")
                     elif element._cached_success():
                         line = p.fmt_subst(line, "state", "cached", fg="magenta")
+                    elif not element._can_query_source_cache():
+                        line = p.fmt_subst(line, "state", "waiting", fg="blue")
                     elif element._fetch_needed():
                         line = p.fmt_subst(line, "state", "fetch needed", fg="red")
                     elif element._buildable():
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 0f2a01c..c4f0479 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1296,6 +1296,16 @@ class Element(Plugin):
         # cache cannot be queried until strict cache key is available
         return self.__artifact is not None
 
+    # _can_query_source_cache():
+    #
+    # Returns whether the source cache status is available.
+    #
+    # Returns:
+    #    (bool): True if source cache can be queried
+    #
+    def _can_query_source_cache(self):
+        return self.__sources.can_query_cache()
+
     # _initialize_state()
     #
     # Compute up the elment's initial state. Element state contains


[buildstream] 05/12: _artifact.py: Make cache query explicit

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit fb36a74815ae858c7e7623bfee45b931aad6b9dc
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Tue Sep 15 11:07:10 2020 +0200

    _artifact.py: Make cache query explicit
    
    Cache query can be fairly expensive as it checks the presence of all
    artifact blobs. Make this more explicit with a `query_cache()` method,
    instead of implicitly querying the cache on the first call to
    `cached()`.
---
 src/buildstream/_artifact.py        | 29 ++++++++++++++---------------
 src/buildstream/_frontend/widget.py |  2 ++
 src/buildstream/element.py          | 16 +++++++++-------
 3 files changed, 25 insertions(+), 22 deletions(-)

diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index d4a716f..b63cff6 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -540,7 +540,7 @@ class Artifact:
 
         return dependency_refs
 
-    # cached():
+    # query_cache():
     #
     # Check whether the artifact corresponding to the stored cache key is
     # available. This also checks whether all required parts of the artifact
@@ -550,11 +550,7 @@ class Artifact:
     # Returns:
     #     (bool): Whether artifact is in local cache
     #
-    def cached(self):
-
-        if self._cached is not None:
-            return self._cached
-
+    def query_cache(self):
         context = self._context
 
         artifact = self._load_proto()
@@ -587,6 +583,18 @@ class Artifact:
         self._cached = True
         return True
 
+    # cached()
+    #
+    # Return whether the artifact is available in the local cache. This must
+    # be called after `query_cache()` or `set_cached()`.
+    #
+    # Returns:
+    #     (bool): Whether artifact is in local cache
+    #
+    def cached(self):
+        assert self._cached is not None
+        return self._cached
+
     # cached_logs()
     #
     # Check if the artifact is cached with log files.
@@ -600,15 +608,6 @@ class Artifact:
         # If the artifact is cached, its log files are available as well.
         return self._element._cached()
 
-    # reset_cached()
-    #
-    # Allow the Artifact to query the filesystem to determine whether it
-    # is cached or not.
-    #
-    def reset_cached(self):
-        self._proto = None
-        self._cached = None
-
     # set_cached()
     #
     # Mark the artifact as cached without querying the filesystem.
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 0d5379f..d289ef2 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -357,6 +357,8 @@ class LogLine(Widget):
                 else:
                     if element.get_kind() == "junction":
                         line = p.fmt_subst(line, "state", "junction", fg="magenta")
+                    elif not element._can_query_cache():
+                        line = p.fmt_subst(line, "state", "unknown", fg="bright_black")
                     elif element._cached_failure():
                         line = p.fmt_subst(line, "state", "failed", fg="red")
                     elif element._cached_success():
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 945ca5c..0f2a01c 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1179,9 +1179,6 @@ class Element(Plugin):
     #            the artifact cache
     #
     def _cached(self):
-        if not self.__artifact:
-            return False
-
         return self.__artifact.cached()
 
     # _cached_remotely():
@@ -1297,7 +1294,7 @@ class Element(Plugin):
     #
     def _can_query_cache(self):
         # cache cannot be queried until strict cache key is available
-        return self.__strict_cache_key is not None
+        return self.__artifact is not None
 
     # _initialize_state()
     #
@@ -1585,7 +1582,9 @@ class Element(Plugin):
     def __should_schedule(self):
         # We're processing if we're already scheduled, we've
         # finished assembling or if we're waiting to pull.
-        processing = self.__assemble_scheduled or self.__assemble_done or self._pull_pending()
+        processing = (
+            self.__assemble_scheduled or self.__assemble_done or (self._can_query_cache() and self._pull_pending())
+        )
 
         # We should schedule a build when
         return (
@@ -1651,7 +1650,7 @@ class Element(Plugin):
             self.__artifact.set_cached()
             self.__cached_successfully = True
         else:
-            self.__artifact.reset_cached()
+            self.__artifact.query_cache()
 
         # When we're building in non-strict mode, we may have
         # assembled everything to this point without a strong cache
@@ -1898,7 +1897,7 @@ class Element(Plugin):
 
         # Artifact may become cached after pulling, so let it query the
         # filesystem again to check
-        self.__artifact.reset_cached()
+        self.__artifact.query_cache()
 
         # We may not have actually pulled an artifact - the pull may
         # have failed. We might therefore need to schedule assembly.
@@ -2570,6 +2569,7 @@ class Element(Plugin):
             return None
 
         artifact = Artifact(self, self._get_context(), strong_key=workspace.last_build)
+        artifact.query_cache()
 
         if not artifact.cached():
             return None
@@ -3279,12 +3279,14 @@ class Element(Plugin):
             strict_key=self.__strict_cache_key,
             weak_key=self.__weak_cache_key,
         )
+        strict_artifact.query_cache()
         if context.get_strict() or strict_artifact.cached():
             self.__artifact = strict_artifact
         else:
             self.__artifact = Artifact(
                 self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key
             )
+            self.__artifact.query_cache()
 
         if not context.get_strict() and self.__artifact.cached():
             # In non-strict mode, strong cache key becomes available when


[buildstream] 09/12: Call _initialize_state() in Element._new_from_load_element()

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit c191ad86da2e373f1e49271e0b898776d9b3b1c5
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Thu Oct 8 17:09:00 2020 +0200

    Call _initialize_state() in Element._new_from_load_element()
    
    With the cache queries moved to job threads, `_initialize_state()` is
    fairly lightweight and can be called earlier.
---
 src/buildstream/_loader/loadelement.pyx |  1 -
 src/buildstream/_loader/loader.py       |  1 -
 src/buildstream/_pipeline.py            | 30 ------------------------------
 src/buildstream/_stream.py              |  1 -
 src/buildstream/element.py              |  4 ++++
 tests/artifactcache/push.py             |  7 -------
 tests/sourcecache/fetch.py              |  6 ------
 tests/sourcecache/push.py               |  2 --
 tests/sourcecache/staging.py            |  3 ---
 9 files changed, 4 insertions(+), 51 deletions(-)

diff --git a/src/buildstream/_loader/loadelement.pyx b/src/buildstream/_loader/loadelement.pyx
index 210869e..f69e138 100644
--- a/src/buildstream/_loader/loadelement.pyx
+++ b/src/buildstream/_loader/loadelement.pyx
@@ -286,7 +286,6 @@ cdef class LoadElement:
             from ..element import Element
 
             element = Element._new_from_load_element(self)
-            element._initialize_state()
 
             # Custom error for link dependencies, since we don't completely
             # parse their dependencies we cannot rely on the built-in ElementError.
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 3d0fb65..bf9e819 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -816,7 +816,6 @@ class Loader:
             )
 
         element = Element._new_from_load_element(load_element)
-        element._initialize_state()
 
         # Handle the case where a subproject has no ref
         #
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index 7aec985..39f5683 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -73,36 +73,6 @@ class Pipeline:
 
             return tuple(element_groups)
 
-    # resolve_elements()
-    #
-    # Resolve element state and cache keys.
-    #
-    # Args:
-    #    targets (list of Element): The list of toplevel element targets
-    #
-    def resolve_elements(self, targets):
-        with self._context.messenger.simple_task("Resolving cached state", silent_nested=True) as task:
-            # We need to go through the project to access the loader
-            if task:
-                task.set_maximum_progress(self._project.loader.loaded)
-
-            # XXX: Now that Element._update_state() can trigger recursive update_state calls
-            # it is possible that we could get a RecursionError. However, this is unlikely
-            # to happen, even for large projects (tested with the Debian stack). Although,
-            # if it does become a problem we may have to set the recursion limit to a
-            # greater value.
-            for element in self.dependencies(targets, _Scope.ALL):
-                # Determine initial element state.
-                element._initialize_state()
-
-                # We may already have Elements which are cached and have their runtimes
-                # cached, if this is the case, we should immediately notify their reverse
-                # dependencies.
-                element._update_ready_for_runtime_and_cached()
-
-                if task:
-                    task.add_current_progress()
-
     # check_remotes()
     #
     # Check if the target artifact is cached in any of the available remotes
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 0558a12..6c4b2d9 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1359,7 +1359,6 @@ class Stream:
 
         # Now move on to loading primary selection.
         #
-        self._pipeline.resolve_elements(self.targets)
         selected = self._pipeline.get_selection(self.targets, selection, silent=False)
         selected = self._pipeline.except_elements(self.targets, selected, except_elements)
 
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 7dcbc32..d9445a4 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1132,6 +1132,8 @@ class Element(Plugin):
 
         element.__preflight()
 
+        element._initialize_state()
+
         if task:
             task.add_current_progress()
 
@@ -2885,6 +2887,8 @@ class Element(Plugin):
         self.__strict_cache_key = artifact.strict_key
         self.__weak_cache_key = artifact.weak_key
 
+        self._initialize_state()
+
     @classmethod
     def __compose_default_splits(cls, project, defaults, first_pass):
 
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 17ad2e2..cd9930e 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -6,7 +6,6 @@ import os
 import pytest
 
 from buildstream import _yaml
-from buildstream.types import _Scope
 from buildstream._project import Project
 from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
 from buildstream.testing import cli  # pylint: disable=unused-import
@@ -33,12 +32,6 @@ def _push(cli, cache_dir, project_dir, config_file, target):
         # Create a local artifact cache handle
         artifactcache = context.artifactcache
 
-        # Ensure the element's artifact memeber is initialised
-        # This is duplicated from Pipeline.resolve_elements()
-        # as this test does not use the cli frontend.
-        for e in element._dependencies(_Scope.ALL):
-            e._initialize_state()
-
         # Manually setup the CAS remotes
         artifactcache.setup_remotes(use_config=True)
         artifactcache.initialize_remotes()
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 40076e4..04ed4ee 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -74,7 +74,6 @@ def test_source_fetch(cli, tmpdir, datafiles):
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
             element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
@@ -115,7 +114,6 @@ def test_source_fetch(cli, tmpdir, datafiles):
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
 
             # check that we have the source in the cas now and it's not fetched
             element._query_source_cache()
@@ -136,7 +134,6 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
             element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
@@ -155,7 +152,6 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
 
             # Check that the source in both in the source dir and the local CAS
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
             element._query_source_cache()
             assert element._cached_sources()
 
@@ -172,7 +168,6 @@ def test_pull_fail(cli, tmpdir, datafiles):
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
             element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
@@ -205,7 +200,6 @@ def test_source_pull_partial_fallback_fetch(cli, tmpdir, datafiles):
             project.ensure_fully_loaded()
 
             element = project.load_elements([element_name])[0]
-            element._initialize_state()
             element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index aa703de..9a06a88 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -84,7 +84,6 @@ def test_source_push_split(cli, tmpdir, datafiles):
             project.ensure_fully_loaded()
 
             element = project.load_elements(["push.bst"])[0]
-            element._initialize_state()
             element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
@@ -135,7 +134,6 @@ def test_source_push(cli, tmpdir, datafiles):
             project.ensure_fully_loaded()
 
             element = project.load_elements(["push.bst"])[0]
-            element._initialize_state()
             element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index e0e7002..6671c79 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -64,7 +64,6 @@ def test_source_staged(tmpdir, cli, datafiles):
         # now check that the source is in the refs file, this is pretty messy but
         # seems to be the only way to get the sources?
         element = project.load_elements(["import-bin.bst"])[0]
-        element._initialize_state()
         element._query_source_cache()
         source = list(element.sources())[0]
         assert element._cached_sources()
@@ -100,7 +99,6 @@ def test_source_fetch(tmpdir, cli, datafiles):
         sourcecache = context.sourcecache
 
         element = project.load_elements(["import-dev.bst"])[0]
-        element._initialize_state()
         element._query_source_cache()
         source = list(element.sources())[0]
         assert element._cached_sources()
@@ -135,7 +133,6 @@ def test_staged_source_build(tmpdir, datafiles, cli):
         project.ensure_fully_loaded()
 
         element = project.load_elements(["import-dev.bst"])[0]
-        element._initialize_state()
 
         # check consistency of the source
         element._query_source_cache()


[buildstream] 08/12: Move artifact and source cache query to a job thread

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit ca909d9c41f074858dd4d24d9b3cc226b7a1c3f8
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Mon Dec 14 19:43:40 2020 +0100

    Move artifact and source cache query to a job thread
    
    This allows parallelization of cache queries.
---
 src/buildstream/_elementsources.py                 |  5 +-
 src/buildstream/_frontend/cli.py                   |  2 +
 src/buildstream/_loader/loader.py                  |  1 +
 src/buildstream/_scheduler/__init__.py             |  1 +
 .../_scheduler/queues/cachequeryqueue.py           | 66 ++++++++++++++++++++++
 src/buildstream/_scheduler/queues/fetchqueue.py    |  2 +-
 src/buildstream/_scheduler/queues/pullqueue.py     |  4 --
 src/buildstream/_stream.py                         | 44 +++++++++++++++
 src/buildstream/element.py                         | 10 +---
 tests/artifactcache/push.py                        |  3 +
 tests/frontend/fetch.py                            |  8 +--
 tests/frontend/track.py                            | 12 ++--
 tests/sourcecache/fetch.py                         |  6 ++
 tests/sourcecache/push.py                          |  2 +
 tests/sourcecache/staging.py                       |  3 +
 tests/sources/git.py                               |  6 +-
 16 files changed, 151 insertions(+), 24 deletions(-)

diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
index 9b4afe4..d426ee6 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -211,6 +211,9 @@ class ElementSources:
     #    SourceError: If one of the element sources has an error
     #
     def fetch(self):
+        if self._cached is None:
+            self.query_cache()
+
         if self.cached():
             return
 
@@ -387,8 +390,6 @@ class ElementSources:
         unique_key = self.get_unique_key()
         self._cache_key = _cachekey.generate_key(unique_key)
 
-        self.query_cache()
-
     # preflight():
     #
     # A internal wrapper for calling the abstract preflight() method on
diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py
index ab06e8a..cad1ee0 100644
--- a/src/buildstream/_frontend/cli.py
+++ b/src/buildstream/_frontend/cli.py
@@ -551,6 +551,8 @@ def show(app, elements, deps, except_, order, format_):
 
         dependencies = app.stream.load_selection(elements, selection=deps, except_targets=except_)
 
+        app.stream.query_cache(dependencies)
+
         if order == "alpha":
             dependencies = sorted(dependencies)
 
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 6ace362..3d0fb65 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -830,6 +830,7 @@ class Loader:
 
         # Handle the case where a subproject needs to be fetched
         #
+        element._query_source_cache()
         if element._should_fetch():
             self.load_context.fetch_subprojects([element])
 
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d2f458f..fcde00d 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -25,6 +25,7 @@ from .queues.trackqueue import TrackQueue
 from .queues.buildqueue import BuildQueue
 from .queues.artifactpushqueue import ArtifactPushQueue
 from .queues.pullqueue import PullQueue
+from .queues.cachequeryqueue import CacheQueryQueue
 
 from .scheduler import Scheduler, SchedStatus
 from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/queues/cachequeryqueue.py b/src/buildstream/_scheduler/queues/cachequeryqueue.py
new file mode 100644
index 0000000..f447ab5
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/cachequeryqueue.py
@@ -0,0 +1,66 @@
+#
+#  Copyright (C) 2020 Bloomberg Finance LP
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from . import Queue, QueueStatus
+from ..resources import ResourceType
+from ..jobs import JobStatus
+from ...types import _KeyStrength
+
+
+# A queue which queries the cache for artifacts and sources
+#
+class CacheQueryQueue(Queue):
+
+    action_name = "Cache-query"
+    complete_name = "Cache queried"
+    resources = [ResourceType.PROCESS, ResourceType.CACHE]
+
+    def __init__(self, scheduler, *, sources=False):
+        super().__init__(scheduler)
+
+        self._query_sources = sources
+
+    def get_process_func(self):
+        if not self._query_sources:
+            return CacheQueryQueue._query_artifacts_or_sources
+        else:
+            return CacheQueryQueue._query_sources
+
+    def status(self, element):
+        if not element._get_cache_key(strength=_KeyStrength.WEAK):
+            # Strict and weak cache keys are unavailable if the element or
+            # a dependency has an unresolved source
+            return QueueStatus.SKIP
+
+        return QueueStatus.READY
+
+    def done(self, _, element, result, status):
+        if status is JobStatus.FAIL:
+            return
+
+        if not self._query_sources:
+            if not element._pull_pending():
+                element._load_artifact_done()
+
+    @staticmethod
+    def _query_artifacts_or_sources(element):
+        element._load_artifact(pull=False)
+        if not element._can_query_cache() or not element._cached_success():
+            element._query_source_cache()
+
+    @staticmethod
+    def _query_sources(element):
+        element._query_source_cache()
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 3a4183d..ee84982 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -56,7 +56,7 @@ class FetchQueue(Queue):
         # This will automatically skip elements which
         # have no sources.
 
-        if not element._should_fetch(self._should_fetch_original):
+        if element._can_query_source_cache() and not element._should_fetch(self._should_fetch_original):
             return QueueStatus.SKIP
 
         return QueueStatus.READY
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index ecff02c..9860256 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -43,10 +43,6 @@ class PullQueue(Queue):
             return QueueStatus.SKIP
 
     def done(self, _, element, result, status):
-
-        if status is JobStatus.FAIL:
-            return
-
         element._load_artifact_done()
 
     @staticmethod
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e05100f..0558a12 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -37,6 +37,7 @@ from ._scheduler import (
     Scheduler,
     SchedStatus,
     TrackQueue,
+    CacheQueryQueue,
     FetchQueue,
     SourcePushQueue,
     BuildQueue,
@@ -162,6 +163,25 @@ class Stream:
 
             return target_objects
 
+    # query_cache()
+    #
+    # Query the artifact and source caches to determine the cache status
+    # of the specified elements.
+    #
+    # Args:
+    #    elements (list of Element): The elements to check
+    #
+    def query_cache(self, elements, *, sources=False):
+        with self._context.messenger.timed_activity("Query cache", silent_nested=True):
+            # Enqueue complete build plan as this is required to determine `buildable` status.
+            plan = list(self._pipeline.dependencies(elements, _Scope.ALL))
+
+            self._scheduler.clear_queues()
+            self._add_queue(CacheQueryQueue(self._scheduler, sources=sources), track=True)
+            self._enqueue_plan(plan)
+            self._run()
+            self._scheduler.clear_queues()
+
     # shell()
     #
     # Run a shell
@@ -208,6 +228,8 @@ class Stream:
             element = self.targets[0]
             element._set_required(scope)
 
+            self.query_cache([element] + elements)
+
             if pull_:
                 self._scheduler.clear_queues()
                 self._add_queue(PullQueue(self._scheduler))
@@ -244,6 +266,7 @@ class Stream:
 
         # Ensure we have our sources if we are launching a build shell
         if scope == _Scope.BUILD and not usebuildtree:
+            self.query_cache([element], sources=True)
             self._fetch([element])
             self._pipeline.assert_sources_cached([element])
 
@@ -294,6 +317,8 @@ class Stream:
                 for element in self.targets:
                     element._set_artifact_files_required(scope=scope)
 
+        self.query_cache(elements)
+
         # Now construct the queues
         #
         self._scheduler.clear_queues()
@@ -339,6 +364,8 @@ class Stream:
             source_remote_url=remote,
         )
 
+        self.query_cache(elements, sources=True)
+
         # Delegated to a shared fetch method
         self._fetch(elements, announce_session=True)
 
@@ -403,6 +430,8 @@ class Stream:
             load_artifacts=True,
         )
 
+        self.query_cache(elements, sources=True)
+
         if not self._sourcecache.has_push_remotes():
             raise StreamError("No source caches available for pushing sources")
 
@@ -447,6 +476,9 @@ class Stream:
             raise StreamError("No artifact caches available for pulling artifacts")
 
         self._pipeline.assert_consistent(elements)
+
+        self.query_cache(elements)
+
         self._scheduler.clear_queues()
         self._add_queue(PullQueue(self._scheduler))
         self._enqueue_plan(elements)
@@ -489,6 +521,8 @@ class Stream:
 
         self._pipeline.assert_consistent(elements)
 
+        self.query_cache(elements)
+
         self._scheduler.clear_queues()
         self._add_queue(PullQueue(self._scheduler))
         self._add_queue(ArtifactPushQueue(self._scheduler))
@@ -539,6 +573,8 @@ class Stream:
 
         self._check_location_writable(location, force=force, tar=tar)
 
+        self.query_cache(elements)
+
         uncached_elts = [elt for elt in elements if not elt._cached()]
         if uncached_elts and pull:
             self._context.messenger.info("Attempting to fetch missing or incomplete artifact")
@@ -617,6 +653,8 @@ class Stream:
             targets, selection=selection, use_artifact_config=True, load_artifacts=True
         )
 
+        self.query_cache(target_objects)
+
         if self._artifacts.has_fetch_remotes():
             self._pipeline.check_remotes(target_objects)
 
@@ -636,6 +674,8 @@ class Stream:
         # Return list of Element and/or ArtifactElement objects
         target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
 
+        self.query_cache(target_objects)
+
         artifact_logs = {}
         for obj in target_objects:
             ref = obj.get_artifact_name()
@@ -664,6 +704,8 @@ class Stream:
         # Return list of Element and/or ArtifactElement objects
         target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
 
+        self.query_cache(target_objects)
+
         elements_to_files = {}
         for obj in target_objects:
             ref = obj.get_artifact_name()
@@ -742,6 +784,7 @@ class Stream:
         elements = self._load((target,), selection=deps, except_targets=except_targets)
 
         # Assert all sources are cached in the source dir
+        self.query_cache(elements, sources=True)
         self._fetch(elements)
         self._pipeline.assert_sources_cached(elements)
 
@@ -775,6 +818,7 @@ class Stream:
         # If we're going to checkout, we need at least a fetch,
         #
         if not no_checkout:
+            self.query_cache(elements, sources=True)
             self._fetch(elements, fetch_original=True)
 
         expanded_directories = []
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index f5f8a71..7dcbc32 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1945,6 +1945,9 @@ class Element(Plugin):
         self.__artifact = artifact
         return pulled
 
+    def _query_source_cache(self):
+        self.__sources.query_cache()
+
     def _skip_source_push(self):
         if not self.sources() or self._get_workspace():
             return True
@@ -3256,13 +3259,6 @@ class Element(Plugin):
             # In strict mode, the strong cache key always matches the strict cache key
             self.__cache_key = self.__strict_cache_key
 
-        # If we've newly calculated a cache key, our artifact's
-        # current state will also change - after all, we can now find
-        # a potential existing artifact.
-        self._load_artifact(pull=False)
-        if not self._pull_pending():
-            self._load_artifact_done()
-
         # Update the message kwargs in use for this plugin to dispatch messages with
         #
         self._message_kwargs["element_key"] = self._get_display_key()
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 74062ce..17ad2e2 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -43,6 +43,9 @@ def _push(cli, cache_dir, project_dir, config_file, target):
         artifactcache.setup_remotes(use_config=True)
         artifactcache.initialize_remotes()
 
+        # Query local cache
+        element._load_artifact(pull=False)
+
         assert artifactcache.has_push_remotes(plugin=element), "No remote configured for element target.bst"
         assert element._push(), "Push operation failed"
 
diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py
index b2c9d64..6c8a4b7 100644
--- a/tests/frontend/fetch.py
+++ b/tests/frontend/fetch.py
@@ -62,10 +62,9 @@ def test_fetch_deps(cli, datafiles, deps, expected_states):
 def test_fetch_consistency_error(cli, datafiles):
     project = str(datafiles)
 
-    # When the error occurs outside of the scheduler at load time,
-    # then the SourceError is reported directly as the main error.
     result = cli.run(project=project, args=["source", "fetch", "error.bst"])
-    result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+    result.assert_main_error(ErrorDomain.STREAM, None)
+    result.assert_task_error(ErrorDomain.SOURCE, "the-consistency-error")
 
 
 @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
@@ -73,7 +72,8 @@ def test_fetch_consistency_bug(cli, datafiles):
     project = str(datafiles)
 
     result = cli.run(project=project, args=["source", "fetch", "bug.bst"])
-    result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+    result.assert_main_error(ErrorDomain.STREAM, None)
+    result.assert_task_error(ErrorDomain.PLUGIN, "source-bug")
 
 
 @pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/frontend/track.py b/tests/frontend/track.py
index bd84449..d1a9324 100644
--- a/tests/frontend/track.py
+++ b/tests/frontend/track.py
@@ -248,20 +248,22 @@ def test_track_cross_junction(cli, tmpdir, datafiles, cross_junction, ref_storag
 def test_track_consistency_error(cli, datafiles):
     project = str(datafiles)
 
-    # Track the element causing a consistency error
+    # Track the element causing a consistency error in `is_cached()`
     result = cli.run(project=project, args=["source", "track", "error.bst"])
-    result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+
+    # We expect tracking to succeed as `is_cached()` is not required for tracking.
+    result.assert_success()
 
 
 @pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
 def test_track_consistency_bug(cli, datafiles):
     project = str(datafiles)
 
-    # Track the element causing an unhandled exception
+    # Track the element causing an unhandled exception in `is_cached()`
     result = cli.run(project=project, args=["source", "track", "bug.bst"])
 
-    # We expect BuildStream to fail gracefully, with no recorded exception.
-    result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+    # We expect tracking to succeed as `is_cached()` is not required for tracking.
+    result.assert_success()
 
 
 @pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 76f5508..40076e4 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -75,6 +75,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
 
             element = project.load_elements([element_name])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -117,6 +118,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
             element._initialize_state()
 
             # check that we have the source in the cas now and it's not fetched
+            element._query_source_cache()
             assert element._cached_sources()
             assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) == []
 
@@ -135,6 +137,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
 
             element = project.load_elements([element_name])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -153,6 +156,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
             # Check that the source in both in the source dir and the local CAS
             element = project.load_elements([element_name])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert element._cached_sources()
 
 
@@ -169,6 +173,7 @@ def test_pull_fail(cli, tmpdir, datafiles):
 
             element = project.load_elements([element_name])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -201,6 +206,7 @@ def test_source_pull_partial_fallback_fetch(cli, tmpdir, datafiles):
 
             element = project.load_elements([element_name])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index 25a4309..aa703de 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -85,6 +85,7 @@ def test_source_push_split(cli, tmpdir, datafiles):
 
             element = project.load_elements(["push.bst"])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
@@ -135,6 +136,7 @@ def test_source_push(cli, tmpdir, datafiles):
 
             element = project.load_elements(["push.bst"])[0]
             element._initialize_state()
+            element._query_source_cache()
             assert not element._cached_sources()
             source = list(element.sources())[0]
 
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 0f2f058..e0e7002 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -65,6 +65,7 @@ def test_source_staged(tmpdir, cli, datafiles):
         # seems to be the only way to get the sources?
         element = project.load_elements(["import-bin.bst"])[0]
         element._initialize_state()
+        element._query_source_cache()
         source = list(element.sources())[0]
         assert element._cached_sources()
         assert sourcecache.contains(source)
@@ -100,6 +101,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
 
         element = project.load_elements(["import-dev.bst"])[0]
         element._initialize_state()
+        element._query_source_cache()
         source = list(element.sources())[0]
         assert element._cached_sources()
 
@@ -136,6 +138,7 @@ def test_staged_source_build(tmpdir, datafiles, cli):
         element._initialize_state()
 
         # check consistency of the source
+        element._query_source_cache()
         assert not element._cached_sources()
 
     res = cli.run(project=project_dir, args=["build", "target.bst"])
diff --git a/tests/sources/git.py b/tests/sources/git.py
index 30657d8..861e70c 100644
--- a/tests/sources/git.py
+++ b/tests/sources/git.py
@@ -401,10 +401,14 @@ def test_submodule_track_ignore_inconsistent(cli, tmpdir, datafiles):
     result = cli.run(project=project, args=["source", "fetch", "target.bst"])
     result.assert_success()
 
-    # Track will encounter an inconsistent submodule without any ref
+    # Track to update to the offending commit
     result = cli.run(project=project, args=["source", "track", "target.bst"])
     result.assert_success()
 
+    # Fetch after track will encounter an inconsistent submodule without any ref
+    result = cli.run(project=project, args=["source", "fetch", "target.bst"])
+    result.assert_success()
+
     # Assert that we are just fine without it, and emit a warning to the user.
     assert "Ignoring inconsistent submodule" in result.stderr
 


[buildstream] 11/12: pipe hack

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 9068295c9a73197a5e49b9b83a8648850c0961af
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Tue Dec 15 07:44:12 2020 +0100

    pipe hack
---
 src/buildstream/_scheduler/jobs/job.py | 19 +++++++++++++++----
 1 file changed, 15 insertions(+), 4 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index c7e2624..7579d24 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -145,10 +145,15 @@ class Job:
         assert not self._terminated, "Attempted to start process which was already terminated"
 
         # FIXME: remove this, this is not necessary when using asyncio
-        self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
+        silence = self.action_name == "Cache-query"
+        if silence:
+            self._pipe_r = pipe_w = None
+        else:
+            self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
 
         self._tries += 1
-        self._parent_start_listening()
+        if not silence:
+            self._parent_start_listening()
 
         # FIXME: remove the parent/child separation, it's not needed anymore.
         self._child = self.create_child_job(  # pylint: disable=assignment-from-no-return
@@ -350,7 +355,8 @@ class Job:
         self._scheduler.job_completed(self, status)
 
         # Force the deletion of the pipe and process objects to try and clean up FDs
-        self._pipe_r.close()
+        if self._pipe_r:
+            self._pipe_r.close()
         self._pipe_r = self._task = None
 
     # _parent_process_pipe()
@@ -359,6 +365,8 @@ class Job:
     # in the parent process.
     #
     def _parent_process_pipe(self):
+        if not self._pipe_r:
+            return
         while self._pipe_r.poll():
             try:
                 message = self._pipe_r.recv()
@@ -581,7 +589,8 @@ class ChildJob:
                 self._thread_id = None
                 return _ReturnCode.TERMINATED, None
             finally:
-                self._pipe_w.close()
+                if self._pipe_w:
+                    self._pipe_w.close()
 
     # terminate()
     #
@@ -620,6 +629,8 @@ class ChildJob:
     #    is_silenced (bool)   : Whether messages are silenced
     #
     def _child_message_handler(self, message, is_silenced):
+        if self.action_name == "Cache-query":
+            return
 
         message.action_name = self.action_name
         message.task_element_name = self._message_element_name


[buildstream] 04/12: _pipeline.py: Drop the optimization for cached elements in the planner

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit afd578ba876b608cb88b875b688c611bdde5eec4
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Wed Sep 16 15:55:26 2020 +0200

    _pipeline.py: Drop the optimization for cached elements in the planner
    
    The overhead of planning already cached elements and unneeded build-only
    dependencies should be fairly small as unneeded jobs can still be
    skipped. This optimization was also already disabled for non-strict
    build plans with a remote artifact cache.
    
    This change is necessary in preparation for parallelizing cache queries.
---
 src/buildstream/_pipeline.py | 26 +++++++++-----------------
 1 file changed, 9 insertions(+), 17 deletions(-)

diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index d53fc9d..7aec985 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -141,9 +141,8 @@ class Pipeline:
     # plan()
     #
     # Generator function to iterate over only the elements
-    # which are required to build the pipeline target, omitting
-    # cached elements. The elements are yielded in a depth sorted
-    # ordering for optimal build plans
+    # which are required to build the pipeline target The elements are
+    # yielded in a depth sorted ordering for optimal build plans
     #
     # Args:
     #    elements (list of Element): List of target elements to plan
@@ -152,11 +151,7 @@ class Pipeline:
     #    (list of Element): A depth sorted list of the build plan
     #
     def plan(self, elements):
-        # Keep locally cached elements in the plan if remote artifact cache is used
-        # to allow pulling artifact with strict cache key, if available.
-        plan_cached = not self._context.get_strict() and self._artifacts.has_fetch_remotes()
-
-        return _Planner().plan(elements, plan_cached)
+        return _Planner().plan(elements)
 
     # get_selection()
     #
@@ -421,9 +416,8 @@ class Pipeline:
 # _Planner()
 #
 # An internal object used for constructing build plan
-# from a given resolved toplevel element, while considering what
-# parts need to be built depending on build only dependencies
-# being cached, and depth sorting for more efficient processing.
+# from a given resolved toplevel element, using depth
+# sorting for more efficient processing.
 #
 class _Planner:
     def __init__(self):
@@ -447,15 +441,13 @@ class _Planner:
         for dep in element._dependencies(_Scope.RUN, recurse=False):
             self.plan_element(dep, depth)
 
-        # Dont try to plan builds of elements that are cached already
-        if not element._cached_success():
-            for dep in element._dependencies(_Scope.BUILD, recurse=False):
-                self.plan_element(dep, depth + 1)
+        for dep in element._dependencies(_Scope.BUILD, recurse=False):
+            self.plan_element(dep, depth + 1)
 
         self.depth_map[element] = depth
         self.visiting_elements.remove(element)
 
-    def plan(self, roots, plan_cached):
+    def plan(self, roots):
         for root in roots:
             self.plan_element(root, 0)
 
@@ -465,4 +457,4 @@ class _Planner:
         for index, item in enumerate(depth_sorted):
             item[0]._set_depth(index)
 
-        return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()]
+        return [item[0] for item in depth_sorted]


[buildstream] 07/12: element.py: Combine cache query and pull into `_load_artifact()`

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 515eab747f44f4f19d0598d6b8e7cd9a339d52ca
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Mon Dec 14 21:36:08 2020 +0100

    element.py: Combine cache query and pull into `_load_artifact()`
    
    In non-strict mode cache query and pull are intertwined as we prefer
    pulling the strict artifact to a cache lookup with the weak cache key.
    
    This replaces `__update_artifact_state()` and `_pull()` with a combined
    `_load_artifact()` method, which supports cache query with deferred
    pulling. This provides correct behavior with the flexibility of split or
    combined cache query and pull.
---
 src/buildstream/_artifact.py                   |   7 +-
 src/buildstream/_frontend/widget.py            |   2 +-
 src/buildstream/_scheduler/queues/pullqueue.py |  13 +-
 src/buildstream/element.py                     | 170 ++++++++++---------------
 4 files changed, 72 insertions(+), 120 deletions(-)

diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index b63cff6..da1e03a 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -591,9 +591,12 @@ class Artifact:
     # Returns:
     #     (bool): Whether artifact is in local cache
     #
-    def cached(self):
+    def cached(self, *, buildtree=False):
         assert self._cached is not None
-        return self._cached
+        ret = self._cached
+        if buildtree:
+            ret = ret and (self.cached_buildtree() or not self.buildtree_exists())
+        return ret
 
     # cached_logs()
     #
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index ad6c813..3ef64d3 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -358,7 +358,7 @@ class LogLine(Widget):
                     if element.get_kind() == "junction":
                         line = p.fmt_subst(line, "state", "junction", fg="magenta")
                     elif not element._can_query_cache():
-                        line = p.fmt_subst(line, "state", "unknown", fg="bright_black")
+                        line = p.fmt_subst(line, "state", "waiting", fg="blue")
                     elif element._cached_failure():
                         line = p.fmt_subst(line, "state", "failed", fg="red")
                     elif element._cached_success():
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index e1d6959..ecff02c 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -37,9 +37,6 @@ class PullQueue(Queue):
         return PullQueue._pull_or_skip
 
     def status(self, element):
-        if not element._can_query_cache():
-            return QueueStatus.PENDING
-
         if element._pull_pending():
             return QueueStatus.READY
         else:
@@ -50,15 +47,9 @@ class PullQueue(Queue):
         if status is JobStatus.FAIL:
             return
 
-        element._pull_done()
-
-    def register_pending_element(self, element):
-        # Set a "can_query_cache"_callback for an element which is not
-        # immediately ready to query the artifact cache so that it
-        # may be pulled.
-        element._set_can_query_cache_callback(self._enqueue_element)
+        element._load_artifact_done()
 
     @staticmethod
     def _pull_or_skip(element):
-        if not element._pull():
+        if not element._load_artifact(pull=True):
             raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index c4f0479..f5f8a71 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -290,7 +290,7 @@ class Element(Plugin):
         self.__sourcecache = context.sourcecache  # Source cache
         self.__assemble_scheduled = False  # Element is scheduled to be assembled
         self.__assemble_done = False  # Element is assembled
-        self.__pull_done = False  # Whether pull was attempted
+        self.__pull_pending = False  # Whether pull is pending
         self.__cached_successfully = None  # If the Element is known to be successfully cached
         self.__splits = None  # Resolved regex objects for computing split domains
         self.__whitelist_regex = None  # Resolved regex object to check if file is allowed to overlap
@@ -1332,9 +1332,6 @@ class Element(Plugin):
     #
     # - __update_cache_keys()
     #   - Computes the strong and weak cache keys.
-    # - _update_artifact_state()
-    #   - Computes the state of the element's artifact using the
-    #     cache key.
     # - __schedule_assembly_when_necessary()
     #   - Schedules assembly of an element, iff its current state
     #     allows/necessitates it
@@ -1869,83 +1866,85 @@ class Element(Plugin):
     #   (bool): Whether a pull operation is pending
     #
     def _pull_pending(self):
-        if self._get_workspace():
-            # Workspace builds are never pushed to artifact servers
-            return False
-
-        # Check whether the pull has been invoked with a specific subdir requested
-        # in user context, as to complete a partial artifact
-        pull_buildtrees = self._get_context().pull_buildtrees
-
-        if self._cached() and self.__artifact._cache_key == self.__strict_cache_key:
-            if pull_buildtrees:
-                # If we've specified a subdir, check if the subdir is cached locally
-                # or if it's possible to get
-                if self._cached_buildtree() or not self._buildtree_exists():
-                    return False
-            else:
-                return False
+        return self.__pull_pending
 
-        # Pull is pending if artifact remote server available
-        # and pull has not been attempted yet
-        return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done
-
-    # _pull_done()
+    # _load_artifact_done()
     #
-    # Indicate that pull was attempted.
+    # Indicate that `_load_artifact()` has completed.
     #
-    # This needs to be called in the main process after a pull
+    # This needs to be called in the main process after `_load_artifact()`
     # succeeds or fails so that we properly update the main
     # process data model
     #
     # This will result in updating the element state.
     #
-    def _pull_done(self):
+    def _load_artifact_done(self):
         assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
 
-        self.__pull_done = True
+        assert self.__artifact
 
-        # Artifact may become cached after pulling, so let it query the
-        # filesystem again to check
-        self.__artifact.query_cache()
+        context = self._get_context()
+
+        if not context.get_strict() and self.__artifact.cached():
+            # In non-strict mode, strong cache key becomes available when
+            # the artifact is cached
+            self.__update_cache_key_non_strict()
 
-        # We may not have actually pulled an artifact - the pull may
-        # have failed. We might therefore need to schedule assembly.
-        self.__schedule_assembly_when_necessary()
-        # If we've finished pulling, an artifact might now exist
-        # locally, so we might need to update a non-strict strong
-        # cache key.
-        self.__update_cache_key_non_strict()
         self._update_ready_for_runtime_and_cached()
 
-    # _pull():
+        self.__schedule_assembly_when_necessary()
+
+        if self.__can_query_cache_callback is not None:
+            self.__can_query_cache_callback(self)
+            self.__can_query_cache_callback = None
+
+    # _load_artifact():
     #
-    # Pull artifact from remote artifact repository into local artifact cache.
+    # Load artifact from cache or pull it from remote artifact repository.
     #
     # Returns: True if the artifact has been downloaded, False otherwise
     #
-    def _pull(self):
+    def _load_artifact(self, *, pull):
         context = self._get_context()
 
-        # Get optional specific subdir to pull and optional list to not pull
-        # based off of user context
-        pull_buildtrees = context.pull_buildtrees
+        pull_buildtrees = context.pull_buildtrees and not self._get_workspace()
 
-        # Attempt to pull artifact without knowing whether it's available
-        strict_artifact = Artifact(self, context, strong_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
-        if strict_artifact.pull(pull_buildtrees=pull_buildtrees):
-            # Notify successful download
-            return True
+        # First check whether we already have the strict artifact in the local cache
+        artifact = Artifact(
+            self,
+            context,
+            strict_key=self.__strict_cache_key,
+            strong_key=self.__strict_cache_key,
+            weak_key=self.__weak_cache_key,
+        )
+        artifact.query_cache()
 
-        if not context.get_strict() and not self._cached():
-            # In non-strict mode also try pulling weak artifact
-            # if no weak artifact is cached yet.
-            artifact = Artifact(self, context, weak_key=self.__weak_cache_key)
-            return artifact.pull(pull_buildtrees=pull_buildtrees)
-        else:
-            # No artifact has been downloaded
+        self.__pull_pending = False
+        if not pull and not artifact.cached(buildtree=pull_buildtrees):
+            if self.__artifacts.has_fetch_remotes(plugin=self) and not self._get_workspace():
+                # Artifact is not completely available in cache and artifact remote server is available.
+                # Stop artifact loading here as pull is required to proceed.
+                self.__pull_pending = True
+
+        # Attempt to pull artifact with the strict cache key
+        pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+        if artifact.cached() or context.get_strict():
+            self.__artifact = artifact
+            return pulled
+        elif self.__pull_pending:
             return False
 
+        # In non-strict mode retry with weak cache key
+        artifact = Artifact(self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
+        artifact.query_cache()
+
+        # Attempt to pull artifact with the weak cache key
+        pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+        self.__artifact = artifact
+        return pulled
+
     def _skip_source_push(self):
         if not self.sources() or self._get_workspace():
             return True
@@ -2384,7 +2383,7 @@ class Element(Plugin):
         assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
 
         if not self.__ready_for_runtime_and_cached:
-            if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success():
+            if self.__runtime_deps_uncached == 0 and self.__artifact and self.__cache_key and self._cached_success():
                 self.__ready_for_runtime_and_cached = True
 
                 # Notify reverse dependencies
@@ -2871,10 +2870,13 @@ class Element(Plugin):
 
         # Load bits which have been stored on the artifact
         #
+        artifact.query_cache()
         if artifact.cached():
             self.__environment = artifact.load_environment()
             self.__sandbox_config = artifact.load_sandbox_config()
             self.__variables = artifact.load_variables()
+        else:
+            self.__pull_pending = True
 
         self.__cache_key = artifact.strong_key
         self.__strict_cache_key = artifact.strict_key
@@ -3257,58 +3259,14 @@ class Element(Plugin):
         # If we've newly calculated a cache key, our artifact's
         # current state will also change - after all, we can now find
         # a potential existing artifact.
-        self.__update_artifact_state()
+        self._load_artifact(pull=False)
+        if not self._pull_pending():
+            self._load_artifact_done()
 
         # Update the message kwargs in use for this plugin to dispatch messages with
         #
         self._message_kwargs["element_key"] = self._get_display_key()
 
-    # __update_artifact_state()
-    #
-    # Updates the data involved in knowing about the artifact corresponding
-    # to this element.
-    #
-    # If the state changes, this will subsequently call
-    # `self.__schedule_assembly_when_necessary()` to schedule assembly if it becomes
-    # possible.
-    #
-    # Element.__update_cache_keys() must be called before this to have
-    # meaningful results, because the element must know its cache key before
-    # it can check whether an artifact exists for that cache key.
-    #
-    def __update_artifact_state(self):
-        assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
-        assert self.__artifact is None
-
-        context = self._get_context()
-
-        strict_artifact = Artifact(
-            self,
-            context,
-            strong_key=self.__strict_cache_key,
-            strict_key=self.__strict_cache_key,
-            weak_key=self.__weak_cache_key,
-        )
-        strict_artifact.query_cache()
-        if context.get_strict() or strict_artifact.cached():
-            self.__artifact = strict_artifact
-        else:
-            self.__artifact = Artifact(
-                self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key
-            )
-            self.__artifact.query_cache()
-
-        if not context.get_strict() and self.__artifact.cached():
-            # In non-strict mode, strong cache key becomes available when
-            # the artifact is cached
-            self.__update_cache_key_non_strict()
-
-        self.__schedule_assembly_when_necessary()
-
-        if self.__can_query_cache_callback is not None:
-            self.__can_query_cache_callback(self)
-            self.__can_query_cache_callback = None
-
     # __update_cache_key_non_strict()
     #
     # Calculates the strong cache key if it hasn't already been set.
@@ -3317,7 +3275,7 @@ class Element(Plugin):
     # strict cache key, so no work needs to be done.
     #
     # When buildstream is not run in strict mode, this requires the artifact
-    # state (as set in Element.__update_artifact_state()) to be set accordingly,
+    # state (as set in Element._load_artifact()) to be set accordingly,
     # as the cache key can be loaded from the cache (possibly pulling from
     # a remote cache).
     #


[buildstream] 01/12: tests/frontend/push.py: Allow pushing of dependencies

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 847703c48ad311a74d0028399b52fbd4fff90b56
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Mon Sep 14 11:24:59 2020 +0200

    tests/frontend/push.py: Allow pushing of dependencies
    
    The assertions in `test_push_after_pull` are too strict. Pushing
    dependencies to the second (empty) artifact server should not cause a
    test failure.
---
 tests/frontend/push.py | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index 4e39c22..2106b6d 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -372,8 +372,8 @@ def test_push_after_pull(cli, tmpdir, datafiles):
         #
         result = cli.run(project=project, args=["build", "target.bst"])
         result.assert_success()
-        assert result.get_pulled_elements() == ["target.bst"]
-        assert result.get_pushed_elements() == []
+        assert "target.bst" in result.get_pulled_elements()
+        assert "target.bst" not in result.get_pushed_elements()
 
         # Delete the artifact locally again.
         cli.remove_artifact_from_cache(project, "target.bst")
@@ -383,8 +383,8 @@ def test_push_after_pull(cli, tmpdir, datafiles):
         cli.configure({"artifacts": [{"url": share1.repo, "push": True}, {"url": share2.repo, "push": True},]})
         result = cli.run(project=project, args=["build", "target.bst"])
         result.assert_success()
-        assert result.get_pulled_elements() == ["target.bst"]
-        assert result.get_pushed_elements() == ["target.bst"]
+        assert "target.bst" in result.get_pulled_elements()
+        assert "target.bst" in result.get_pushed_elements()
 
 
 # Ensure that when an artifact's size exceeds available disk space