You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ju...@apache.org on 2021/03/02 16:02:22 UTC
[buildstream] 07/08: Move artifact and source cache query trigger
to Stream class
This is an automated email from the ASF dual-hosted git repository.
juergbi pushed a commit to branch juerg/cache-query
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit bc4d6e633a972e66eab975cfbf3dbf6197dad304
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Tue Mar 2 11:29:18 2021 +0100
Move artifact and source cache query trigger to Stream class
This is in preparation for parallelization of cache queries.
---
src/buildstream/_elementsources.py | 5 ++-
src/buildstream/_frontend/cli.py | 2 +
src/buildstream/_loader/loader.py | 1 +
src/buildstream/_scheduler/queues/fetchqueue.py | 2 +-
src/buildstream/_scheduler/queues/pullqueue.py | 5 ---
src/buildstream/_stream.py | 50 +++++++++++++++++++++++++
src/buildstream/element.py | 10 ++---
tests/artifactcache/push.py | 3 ++
tests/frontend/fetch.py | 8 ++--
tests/frontend/track.py | 12 +++---
tests/sourcecache/fetch.py | 6 +++
tests/sourcecache/push.py | 2 +
tests/sourcecache/staging.py | 3 ++
tests/sources/git.py | 6 ++-
14 files changed, 90 insertions(+), 25 deletions(-)
diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
index 9b4afe4..d426ee6 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -211,6 +211,9 @@ class ElementSources:
# SourceError: If one of the element sources has an error
#
def fetch(self):
+ if self._cached is None:
+ self.query_cache()
+
if self.cached():
return
@@ -387,8 +390,6 @@ class ElementSources:
unique_key = self.get_unique_key()
self._cache_key = _cachekey.generate_key(unique_key)
- self.query_cache()
-
# preflight():
#
# A internal wrapper for calling the abstract preflight() method on
diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py
index 6a9a18b..f20d98f 100644
--- a/src/buildstream/_frontend/cli.py
+++ b/src/buildstream/_frontend/cli.py
@@ -606,6 +606,8 @@ def show(app, elements, deps, except_, order, format_):
dependencies = app.stream.load_selection(elements, selection=deps, except_targets=except_)
+ app.stream.query_cache(dependencies)
+
if order == "alpha":
dependencies = sorted(dependencies)
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 48fb1c4..6db2017 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -830,6 +830,7 @@ class Loader:
# Handle the case where a subproject needs to be fetched
#
+ element._query_source_cache()
if element._should_fetch():
self.load_context.fetch_subprojects([element])
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 3a4183d..ee84982 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -56,7 +56,7 @@ class FetchQueue(Queue):
# This will automatically skip elements which
# have no sources.
- if not element._should_fetch(self._should_fetch_original):
+ if element._can_query_source_cache() and not element._should_fetch(self._should_fetch_original):
return QueueStatus.SKIP
return QueueStatus.READY
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index ecff02c..925ded0 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -21,7 +21,6 @@
# Local imports
from . import Queue, QueueStatus
from ..resources import ResourceType
-from ..jobs import JobStatus
from ..._exceptions import SkipJob
@@ -43,10 +42,6 @@ class PullQueue(Queue):
return QueueStatus.SKIP
def done(self, _, element, result, status):
-
- if status is JobStatus.FAIL:
- return
-
element._load_artifact_done()
@staticmethod
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index a44d613..87bfeed 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -177,6 +177,30 @@ class Stream:
)
return target_objects
+ # query_cache()
+ #
+ # Query the artifact and source caches to determine the cache status
+ # of the specified elements.
+ #
+ # Args:
+ # elements (list of Element): The elements to check
+ # sources (bool): True to only query the source cache
+ #
+ def query_cache(self, elements, *, sources=False):
+ with self._context.messenger.timed_activity("Query cache", silent_nested=True):
+ # Enqueue complete build plan as this is required to determine `buildable` status.
+ plan = list(_pipeline.dependencies(elements, _Scope.ALL))
+
+ for element in plan:
+ if not sources and element._get_cache_key(strength=_KeyStrength.WEAK):
+ element._load_artifact(pull=False)
+ if not element._can_query_cache() or not element._cached_success():
+ element._query_source_cache()
+ if not element._pull_pending():
+ element._load_artifact_done()
+ elif element._has_all_sources_resolved():
+ element._query_source_cache()
+
# shell()
#
# Run a shell
@@ -241,6 +265,8 @@ class Stream:
element = self.targets[0]
element._set_required(scope)
+ self.query_cache([element] + elements)
+
if pull_:
self._reset()
self._add_queue(PullQueue(self._scheduler))
@@ -280,6 +306,7 @@ class Stream:
# Ensure we have our sources if we are launching a build shell
if scope == _Scope.BUILD and not usebuildtree:
+ self.query_cache([element], sources=True)
self._fetch([element])
_pipeline.assert_sources_cached(self._context, [element])
@@ -342,6 +369,8 @@ class Stream:
for element in self.targets:
element._set_artifact_files_required(scope=scope)
+ self.query_cache(elements)
+
# Now construct the queues
#
self._reset()
@@ -393,6 +422,8 @@ class Stream:
ignore_project_source_remotes=ignore_project_source_remotes,
)
+ self.query_cache(elements, sources=True)
+
# Delegated to a shared fetch method
self._fetch(elements, announce_session=True)
@@ -465,6 +496,8 @@ class Stream:
ignore_project_source_remotes=ignore_project_source_remotes,
)
+ self.query_cache(elements, sources=True)
+
if not self._sourcecache.has_push_remotes():
raise StreamError("No source caches available for pushing sources")
@@ -513,6 +546,9 @@ class Stream:
raise StreamError("No artifact caches available for pulling artifacts")
_pipeline.assert_consistent(self._context, elements)
+
+ self.query_cache(elements)
+
self._reset()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(elements)
@@ -559,6 +595,8 @@ class Stream:
_pipeline.assert_consistent(self._context, elements)
+ self.query_cache(elements)
+
self._reset()
self._add_queue(PullQueue(self._scheduler))
self._add_queue(ArtifactPushQueue(self._scheduler))
@@ -621,6 +659,8 @@ class Stream:
self._check_location_writable(location, force=force, tar=tar)
+ self.query_cache(elements)
+
uncached_elts = [elt for elt in elements if not elt._cached()]
if uncached_elts and pull:
self._context.messenger.info("Attempting to fetch missing or incomplete artifact")
@@ -699,6 +739,8 @@ class Stream:
targets, selection=selection, connect_artifact_cache=True, load_artifacts=True
)
+ self.query_cache(target_objects)
+
if self._artifacts.has_fetch_remotes():
self._resolve_cached_remotely(target_objects)
@@ -718,6 +760,8 @@ class Stream:
# Return list of Element and/or ArtifactElement objects
target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
+ self.query_cache(target_objects)
+
artifact_logs = {}
for obj in target_objects:
ref = obj.get_artifact_name()
@@ -746,6 +790,8 @@ class Stream:
# Return list of Element and/or ArtifactElement objects
target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
+ self.query_cache(target_objects)
+
elements_to_files = {}
for obj in target_objects:
ref = obj.get_artifact_name()
@@ -835,6 +881,7 @@ class Stream:
)
# Assert all sources are cached in the source dir
+ self.query_cache(elements, sources=True)
self._fetch(elements)
_pipeline.assert_sources_cached(self._context, elements)
@@ -885,6 +932,7 @@ class Stream:
# If we're going to checkout, we need at least a fetch,
#
if not no_checkout:
+ self.query_cache(elements, sources=True)
self._fetch(elements, fetch_original=True)
expanded_directories = []
@@ -1547,6 +1595,8 @@ class Stream:
for element in artifacts:
element._set_required(_Scope.NONE)
+ self.query_cache(artifacts)
+
self._reset()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(artifacts)
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 3b15bc1..943852f 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1948,6 +1948,9 @@ class Element(Plugin):
self.__artifact = artifact
return pulled
+ def _query_source_cache(self):
+ self.__sources.query_cache()
+
def _skip_source_push(self):
if not self.sources() or self._get_workspace():
return True
@@ -3256,13 +3259,6 @@ class Element(Plugin):
# In strict mode, the strong cache key always matches the strict cache key
self.__cache_key = self.__strict_cache_key
- # If we've newly calculated a cache key, our artifact's
- # current state will also change - after all, we can now find
- # a potential existing artifact.
- self._load_artifact(pull=False)
- if not self._pull_pending():
- self._load_artifact_done()
-
# Update the message kwargs in use for this plugin to dispatch messages with
#
self._message_kwargs["element_key"] = self._get_display_key()
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 858065d..6b31542 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -42,6 +42,9 @@ def _push(cli, cache_dir, project_dir, config_file, target):
# Initialize remotes
context.initialize_remotes(True, True, None, None)
+ # Query local cache
+ element._load_artifact(pull=False)
+
assert artifactcache.has_push_remotes(plugin=element), "No remote configured for element target.bst"
assert element._push(), "Push operation failed"
diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py
index b2c9d64..6c8a4b7 100644
--- a/tests/frontend/fetch.py
+++ b/tests/frontend/fetch.py
@@ -62,10 +62,9 @@ def test_fetch_deps(cli, datafiles, deps, expected_states):
def test_fetch_consistency_error(cli, datafiles):
project = str(datafiles)
- # When the error occurs outside of the scheduler at load time,
- # then the SourceError is reported directly as the main error.
result = cli.run(project=project, args=["source", "fetch", "error.bst"])
- result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+ result.assert_main_error(ErrorDomain.STREAM, None)
+ result.assert_task_error(ErrorDomain.SOURCE, "the-consistency-error")
@pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
@@ -73,7 +72,8 @@ def test_fetch_consistency_bug(cli, datafiles):
project = str(datafiles)
result = cli.run(project=project, args=["source", "fetch", "bug.bst"])
- result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+ result.assert_main_error(ErrorDomain.STREAM, None)
+ result.assert_task_error(ErrorDomain.PLUGIN, "source-bug")
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/frontend/track.py b/tests/frontend/track.py
index 3dd686d..950cd83 100644
--- a/tests/frontend/track.py
+++ b/tests/frontend/track.py
@@ -248,20 +248,22 @@ def test_track_cross_junction(cli, tmpdir, datafiles, cross_junction, ref_storag
def test_track_consistency_error(cli, datafiles):
project = str(datafiles)
- # Track the element causing a consistency error
+ # Track the element causing a consistency error in `is_cached()`
result = cli.run(project=project, args=["source", "track", "error.bst"])
- result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+
+ # We expect tracking to succeed as `is_cached()` is not required for tracking.
+ result.assert_success()
@pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
def test_track_consistency_bug(cli, datafiles):
project = str(datafiles)
- # Track the element causing an unhandled exception
+ # Track the element causing an unhandled exception in `is_cached()`
result = cli.run(project=project, args=["source", "track", "bug.bst"])
- # We expect BuildStream to fail gracefully, with no recorded exception.
- result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+ # We expect tracking to succeed as `is_cached()` is not required for tracking.
+ result.assert_success()
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 86dac0b..46f39b0 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -75,6 +75,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -117,6 +118,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
element._initialize_state()
# check that we have the source in the cas now and it's not fetched
+ element._query_source_cache()
assert element._cached_sources()
assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) == []
@@ -135,6 +137,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -153,6 +156,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
# Check that the source in both in the source dir and the local CAS
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._query_source_cache()
assert element._cached_sources()
@@ -169,6 +173,7 @@ def test_pull_fail(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -201,6 +206,7 @@ def test_source_pull_partial_fallback_fetch(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index edbcfdf..76333ca 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -87,6 +87,7 @@ def test_source_push_split(cli, tmpdir, datafiles):
element = project.load_elements(["push.bst"])[0]
element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -137,6 +138,7 @@ def test_source_push(cli, tmpdir, datafiles):
element = project.load_elements(["push.bst"])[0]
element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 0f2f058..e0e7002 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -65,6 +65,7 @@ def test_source_staged(tmpdir, cli, datafiles):
# seems to be the only way to get the sources?
element = project.load_elements(["import-bin.bst"])[0]
element._initialize_state()
+ element._query_source_cache()
source = list(element.sources())[0]
assert element._cached_sources()
assert sourcecache.contains(source)
@@ -100,6 +101,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
element = project.load_elements(["import-dev.bst"])[0]
element._initialize_state()
+ element._query_source_cache()
source = list(element.sources())[0]
assert element._cached_sources()
@@ -136,6 +138,7 @@ def test_staged_source_build(tmpdir, datafiles, cli):
element._initialize_state()
# check consistency of the source
+ element._query_source_cache()
assert not element._cached_sources()
res = cli.run(project=project_dir, args=["build", "target.bst"])
diff --git a/tests/sources/git.py b/tests/sources/git.py
index 30657d8..861e70c 100644
--- a/tests/sources/git.py
+++ b/tests/sources/git.py
@@ -401,10 +401,14 @@ def test_submodule_track_ignore_inconsistent(cli, tmpdir, datafiles):
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- # Track will encounter an inconsistent submodule without any ref
+ # Track to update to the offending commit
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
+ # Fetch after track will encounter an inconsistent submodule without any ref
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
+ result.assert_success()
+
# Assert that we are just fine without it, and emit a warning to the user.
assert "Ignoring inconsistent submodule" in result.stderr