You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:49:34 UTC
[buildstream] branch juerg/cache-query-job-benchmark created (now
a0637f4)
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a change to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git.
at a0637f4 test
This branch includes the following new commits:
new 847703c tests/frontend/push.py: Allow pushing of dependencies
new a2ef29b tests/internals/pluginloading: Add missing get_ref() to FooSource
new fb47c89 fetchqueue.py: Don't skip elements with a cached failure
new afd578b _pipeline.py: Drop the optimization for cached elements in the planner
new fb36a74 _artifact.py: Make cache query explicit
new 75df8f8 _elementsources.py: Make cache query explicit
new 515eab7 element.py: Combine cache query and pull into `_load_artifact()`
new ca909d9 Move artifact and source cache query to a job thread
new c191ad8 Call _initialize_state() in Element._new_from_load_element()
new 9136861 hack for benchmark
new 9068295 pipe hack
new a0637f4 test
The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
[buildstream] 02/12: tests/internals/pluginloading: Add missing
get_ref() to FooSource
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit a2ef29b0c891e284db0f8007599920c0a71929af
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Thu Oct 8 17:07:29 2020 +0200
tests/internals/pluginloading: Add missing get_ref() to FooSource
---
tests/internals/pluginloading/customsource/pluginsources/foo.py | 3 +++
1 file changed, 3 insertions(+)
diff --git a/tests/internals/pluginloading/customsource/pluginsources/foo.py b/tests/internals/pluginloading/customsource/pluginsources/foo.py
index c5229f3..fd44a54 100644
--- a/tests/internals/pluginloading/customsource/pluginsources/foo.py
+++ b/tests/internals/pluginloading/customsource/pluginsources/foo.py
@@ -14,6 +14,9 @@ class FooSource(Source):
def get_unique_key(self):
pass
+ def get_ref(self):
+ pass
+
def setup():
return FooSource
[buildstream] 10/12: hack for benchmark
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 91368618e014b535b929a3e747610f941a1055cf
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Tue Dec 15 07:14:58 2020 +0100
hack for benchmark
---
src/buildstream/_scheduler/jobs/job.py | 21 +++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index aa71b6e..c7e2624 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -22,6 +22,7 @@
# System imports
import asyncio
+import contextlib
import datetime
import itertools
import multiprocessing
@@ -495,13 +496,19 @@ class ChildJob:
self._pipe_w = pipe_w
self._messenger.set_message_handler(self._child_message_handler)
+ # FIXME
+ silence = self.action_name == "Cache-query"
+
# Time, log and and run the action function
#
- with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages(
- self._logfile, self._logdir
- ) as filename:
+ if silence:
+ record_cm = contextlib.suppress()
+ else:
+ record_cm = self._messenger.recorded_messages(self._logfile, self._logdir)
+ with self._messenger.timed_suspendable() as timeinfo, record_cm as filename:
try:
- self.message(MessageType.START, self.action_name, logfile=filename)
+ if not silence:
+ self.message(MessageType.START, self.action_name, logfile=filename)
with self._terminate_lock:
self._thread_id = threading.current_thread().ident
@@ -513,7 +520,8 @@ class ChildJob:
result = self.child_process() # pylint: disable=assignment-from-no-return
except SkipJob as e:
elapsed = datetime.datetime.now() - timeinfo.start_time
- self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
+ if not silence:
+ self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
# Alert parent of skip by return code
return _ReturnCode.SKIPPED, None
@@ -560,7 +568,8 @@ class ChildJob:
else:
# No exception occurred in the action
elapsed = datetime.datetime.now() - timeinfo.start_time
- self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
+ if not silence:
+ self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
[buildstream] 03/12: fetchqueue.py: Don't skip elements with a
cached failure
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit fb47c892f072fed1eb0870d5fc84ed88fec48276
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Thu Sep 17 17:44:50 2020 +0200
fetchqueue.py: Don't skip elements with a cached failure
The build queue requires the sources to be available for all elements
where `_cached_success()` returns `False`. This includes elements with a
cached failure.
---
src/buildstream/_scheduler/queues/fetchqueue.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 18bf392..3a4183d 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -50,7 +50,7 @@ class FetchQueue(Queue):
if not element._can_query_cache():
return QueueStatus.PENDING
- if element._cached():
+ if element._cached_success():
return QueueStatus.SKIP
# This will automatically skip elements which
[buildstream] 12/12: test
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit a0637f4172676d0b4b59377e79601807c2966c8a
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Tue Dec 22 18:14:16 2020 +0100
test
---
src/buildstream/_stream.py | 21 ++++++++++++++++-----
1 file changed, 16 insertions(+), 5 deletions(-)
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 6c4b2d9..72bc9f5 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -176,11 +176,22 @@ class Stream:
# Enqueue complete build plan as this is required to determine `buildable` status.
plan = list(self._pipeline.dependencies(elements, _Scope.ALL))
- self._scheduler.clear_queues()
- self._add_queue(CacheQueryQueue(self._scheduler, sources=sources), track=True)
- self._enqueue_plan(plan)
- self._run()
- self._scheduler.clear_queues()
+ for element in plan:
+ if not sources and element._get_cache_key(strength=_KeyStrength.WEAK):
+ element._load_artifact(pull=False)
+ if not element._can_query_cache() or not element._cached_success():
+ element._query_source_cache()
+ if not element._pull_pending():
+ element._load_artifact_done()
+ else:
+ element._query_source_cache()
+
+ if False:
+ self._scheduler.clear_queues()
+ self._add_queue(CacheQueryQueue(self._scheduler, sources=sources), track=True)
+ self._enqueue_plan(plan)
+ self._run()
+ self._scheduler.clear_queues()
# shell()
#
[buildstream] 06/12: _elementsources.py: Make cache query explicit
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 75df8f827462a1e966be40dd5575941784baaa85
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Thu Sep 17 11:04:15 2020 +0200
_elementsources.py: Make cache query explicit
Cache query can be fairly expensive as it checks the presence of all
blobs. Make this more explicit with a `query_cache()` method, instead of
implicitly querying the cache on the first call to `cached()`.
---
src/buildstream/_elementsources.py | 31 ++++++++++++++++++++++++++-----
src/buildstream/_frontend/widget.py | 2 ++
src/buildstream/element.py | 10 ++++++++++
3 files changed, 38 insertions(+), 5 deletions(-)
diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
index a15d20e..9b4afe4 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -293,7 +293,7 @@ class ElementSources:
length = min(len(key), context.log_key_length)
return key[:length]
- # cached():
+ # query_cache():
#
# Check if the element sources are cached in CAS, generating the source
# cache keys if needed.
@@ -301,10 +301,7 @@ class ElementSources:
# Returns:
# (bool): True if the element sources are cached
#
- def cached(self):
- if self._cached is not None:
- return self._cached
-
+ def query_cache(self):
cas = self._context.get_cascache()
elementsourcescache = self._elementsourcescache
@@ -321,6 +318,28 @@ class ElementSources:
self._cached = True
return True
+ # can_query_cache():
+ #
+ # Returns whether the cache status is available.
+ #
+ # Returns:
+ # (bool): True if cache status is available
+ #
+ def can_query_cache(self):
+ return self._cached is not None
+
+ # cached()
+ #
+ # Return whether the element sources are cached in CAS. This must be
+ # called only when all sources are resolved.
+ #
+ # Returns:
+ # (bool): True if the element sources are cached
+ #
+ def cached(self):
+ assert self._cached is not None
+ return self._cached
+
# is_resolved():
#
# Get whether all sources of the element are resolved
@@ -368,6 +387,8 @@ class ElementSources:
unique_key = self.get_unique_key()
self._cache_key = _cachekey.generate_key(unique_key)
+ self.query_cache()
+
# preflight():
#
# A internal wrapper for calling the abstract preflight() method on
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index d289ef2..ad6c813 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -363,6 +363,8 @@ class LogLine(Widget):
line = p.fmt_subst(line, "state", "failed", fg="red")
elif element._cached_success():
line = p.fmt_subst(line, "state", "cached", fg="magenta")
+ elif not element._can_query_source_cache():
+ line = p.fmt_subst(line, "state", "waiting", fg="blue")
elif element._fetch_needed():
line = p.fmt_subst(line, "state", "fetch needed", fg="red")
elif element._buildable():
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 0f2a01c..c4f0479 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1296,6 +1296,16 @@ class Element(Plugin):
# cache cannot be queried until strict cache key is available
return self.__artifact is not None
+ # _can_query_source_cache():
+ #
+ # Returns whether the source cache status is available.
+ #
+ # Returns:
+ # (bool): True if source cache can be queried
+ #
+ def _can_query_source_cache(self):
+ return self.__sources.can_query_cache()
+
# _initialize_state()
#
# Compute up the elment's initial state. Element state contains
[buildstream] 05/12: _artifact.py: Make cache query explicit
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit fb36a74815ae858c7e7623bfee45b931aad6b9dc
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Tue Sep 15 11:07:10 2020 +0200
_artifact.py: Make cache query explicit
Cache query can be fairly expensive as it checks the presence of all
artifact blobs. Make this more explicit with a `query_cache()` method,
instead of implicitly querying the cache on the first call to
`cached()`.
---
src/buildstream/_artifact.py | 29 ++++++++++++++---------------
src/buildstream/_frontend/widget.py | 2 ++
src/buildstream/element.py | 16 +++++++++-------
3 files changed, 25 insertions(+), 22 deletions(-)
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index d4a716f..b63cff6 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -540,7 +540,7 @@ class Artifact:
return dependency_refs
- # cached():
+ # query_cache():
#
# Check whether the artifact corresponding to the stored cache key is
# available. This also checks whether all required parts of the artifact
@@ -550,11 +550,7 @@ class Artifact:
# Returns:
# (bool): Whether artifact is in local cache
#
- def cached(self):
-
- if self._cached is not None:
- return self._cached
-
+ def query_cache(self):
context = self._context
artifact = self._load_proto()
@@ -587,6 +583,18 @@ class Artifact:
self._cached = True
return True
+ # cached()
+ #
+ # Return whether the artifact is available in the local cache. This must
+ # be called after `query_cache()` or `set_cached()`.
+ #
+ # Returns:
+ # (bool): Whether artifact is in local cache
+ #
+ def cached(self):
+ assert self._cached is not None
+ return self._cached
+
# cached_logs()
#
# Check if the artifact is cached with log files.
@@ -600,15 +608,6 @@ class Artifact:
# If the artifact is cached, its log files are available as well.
return self._element._cached()
- # reset_cached()
- #
- # Allow the Artifact to query the filesystem to determine whether it
- # is cached or not.
- #
- def reset_cached(self):
- self._proto = None
- self._cached = None
-
# set_cached()
#
# Mark the artifact as cached without querying the filesystem.
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index 0d5379f..d289ef2 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -357,6 +357,8 @@ class LogLine(Widget):
else:
if element.get_kind() == "junction":
line = p.fmt_subst(line, "state", "junction", fg="magenta")
+ elif not element._can_query_cache():
+ line = p.fmt_subst(line, "state", "unknown", fg="bright_black")
elif element._cached_failure():
line = p.fmt_subst(line, "state", "failed", fg="red")
elif element._cached_success():
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 945ca5c..0f2a01c 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1179,9 +1179,6 @@ class Element(Plugin):
# the artifact cache
#
def _cached(self):
- if not self.__artifact:
- return False
-
return self.__artifact.cached()
# _cached_remotely():
@@ -1297,7 +1294,7 @@ class Element(Plugin):
#
def _can_query_cache(self):
# cache cannot be queried until strict cache key is available
- return self.__strict_cache_key is not None
+ return self.__artifact is not None
# _initialize_state()
#
@@ -1585,7 +1582,9 @@ class Element(Plugin):
def __should_schedule(self):
# We're processing if we're already scheduled, we've
# finished assembling or if we're waiting to pull.
- processing = self.__assemble_scheduled or self.__assemble_done or self._pull_pending()
+ processing = (
+ self.__assemble_scheduled or self.__assemble_done or (self._can_query_cache() and self._pull_pending())
+ )
# We should schedule a build when
return (
@@ -1651,7 +1650,7 @@ class Element(Plugin):
self.__artifact.set_cached()
self.__cached_successfully = True
else:
- self.__artifact.reset_cached()
+ self.__artifact.query_cache()
# When we're building in non-strict mode, we may have
# assembled everything to this point without a strong cache
@@ -1898,7 +1897,7 @@ class Element(Plugin):
# Artifact may become cached after pulling, so let it query the
# filesystem again to check
- self.__artifact.reset_cached()
+ self.__artifact.query_cache()
# We may not have actually pulled an artifact - the pull may
# have failed. We might therefore need to schedule assembly.
@@ -2570,6 +2569,7 @@ class Element(Plugin):
return None
artifact = Artifact(self, self._get_context(), strong_key=workspace.last_build)
+ artifact.query_cache()
if not artifact.cached():
return None
@@ -3279,12 +3279,14 @@ class Element(Plugin):
strict_key=self.__strict_cache_key,
weak_key=self.__weak_cache_key,
)
+ strict_artifact.query_cache()
if context.get_strict() or strict_artifact.cached():
self.__artifact = strict_artifact
else:
self.__artifact = Artifact(
self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key
)
+ self.__artifact.query_cache()
if not context.get_strict() and self.__artifact.cached():
# In non-strict mode, strong cache key becomes available when
[buildstream] 09/12: Call _initialize_state() in
Element._new_from_load_element()
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit c191ad86da2e373f1e49271e0b898776d9b3b1c5
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Thu Oct 8 17:09:00 2020 +0200
Call _initialize_state() in Element._new_from_load_element()
With the cache queries moved to job threads, `_initialize_state()` is
fairly lightweight and can be called earlier.
---
src/buildstream/_loader/loadelement.pyx | 1 -
src/buildstream/_loader/loader.py | 1 -
src/buildstream/_pipeline.py | 30 ------------------------------
src/buildstream/_stream.py | 1 -
src/buildstream/element.py | 4 ++++
tests/artifactcache/push.py | 7 -------
tests/sourcecache/fetch.py | 6 ------
tests/sourcecache/push.py | 2 --
tests/sourcecache/staging.py | 3 ---
9 files changed, 4 insertions(+), 51 deletions(-)
diff --git a/src/buildstream/_loader/loadelement.pyx b/src/buildstream/_loader/loadelement.pyx
index 210869e..f69e138 100644
--- a/src/buildstream/_loader/loadelement.pyx
+++ b/src/buildstream/_loader/loadelement.pyx
@@ -286,7 +286,6 @@ cdef class LoadElement:
from ..element import Element
element = Element._new_from_load_element(self)
- element._initialize_state()
# Custom error for link dependencies, since we don't completely
# parse their dependencies we cannot rely on the built-in ElementError.
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 3d0fb65..bf9e819 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -816,7 +816,6 @@ class Loader:
)
element = Element._new_from_load_element(load_element)
- element._initialize_state()
# Handle the case where a subproject has no ref
#
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index 7aec985..39f5683 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -73,36 +73,6 @@ class Pipeline:
return tuple(element_groups)
- # resolve_elements()
- #
- # Resolve element state and cache keys.
- #
- # Args:
- # targets (list of Element): The list of toplevel element targets
- #
- def resolve_elements(self, targets):
- with self._context.messenger.simple_task("Resolving cached state", silent_nested=True) as task:
- # We need to go through the project to access the loader
- if task:
- task.set_maximum_progress(self._project.loader.loaded)
-
- # XXX: Now that Element._update_state() can trigger recursive update_state calls
- # it is possible that we could get a RecursionError. However, this is unlikely
- # to happen, even for large projects (tested with the Debian stack). Although,
- # if it does become a problem we may have to set the recursion limit to a
- # greater value.
- for element in self.dependencies(targets, _Scope.ALL):
- # Determine initial element state.
- element._initialize_state()
-
- # We may already have Elements which are cached and have their runtimes
- # cached, if this is the case, we should immediately notify their reverse
- # dependencies.
- element._update_ready_for_runtime_and_cached()
-
- if task:
- task.add_current_progress()
-
# check_remotes()
#
# Check if the target artifact is cached in any of the available remotes
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 0558a12..6c4b2d9 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -1359,7 +1359,6 @@ class Stream:
# Now move on to loading primary selection.
#
- self._pipeline.resolve_elements(self.targets)
selected = self._pipeline.get_selection(self.targets, selection, silent=False)
selected = self._pipeline.except_elements(self.targets, selected, except_elements)
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index 7dcbc32..d9445a4 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1132,6 +1132,8 @@ class Element(Plugin):
element.__preflight()
+ element._initialize_state()
+
if task:
task.add_current_progress()
@@ -2885,6 +2887,8 @@ class Element(Plugin):
self.__strict_cache_key = artifact.strict_key
self.__weak_cache_key = artifact.weak_key
+ self._initialize_state()
+
@classmethod
def __compose_default_splits(cls, project, defaults, first_pass):
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 17ad2e2..cd9930e 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -6,7 +6,6 @@ import os
import pytest
from buildstream import _yaml
-from buildstream.types import _Scope
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream.testing import cli # pylint: disable=unused-import
@@ -33,12 +32,6 @@ def _push(cli, cache_dir, project_dir, config_file, target):
# Create a local artifact cache handle
artifactcache = context.artifactcache
- # Ensure the element's artifact memeber is initialised
- # This is duplicated from Pipeline.resolve_elements()
- # as this test does not use the cli frontend.
- for e in element._dependencies(_Scope.ALL):
- e._initialize_state()
-
# Manually setup the CAS remotes
artifactcache.setup_remotes(use_config=True)
artifactcache.initialize_remotes()
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 40076e4..04ed4ee 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -74,7 +74,6 @@ def test_source_fetch(cli, tmpdir, datafiles):
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -115,7 +114,6 @@ def test_source_fetch(cli, tmpdir, datafiles):
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
# check that we have the source in the cas now and it's not fetched
element._query_source_cache()
@@ -136,7 +134,6 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -155,7 +152,6 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
# Check that the source in both in the source dir and the local CAS
element = project.load_elements([element_name])[0]
- element._initialize_state()
element._query_source_cache()
assert element._cached_sources()
@@ -172,7 +168,6 @@ def test_pull_fail(cli, tmpdir, datafiles):
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -205,7 +200,6 @@ def test_source_pull_partial_fallback_fetch(cli, tmpdir, datafiles):
project.ensure_fully_loaded()
element = project.load_elements([element_name])[0]
- element._initialize_state()
element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index aa703de..9a06a88 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -84,7 +84,6 @@ def test_source_push_split(cli, tmpdir, datafiles):
project.ensure_fully_loaded()
element = project.load_elements(["push.bst"])[0]
- element._initialize_state()
element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -135,7 +134,6 @@ def test_source_push(cli, tmpdir, datafiles):
project.ensure_fully_loaded()
element = project.load_elements(["push.bst"])[0]
- element._initialize_state()
element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index e0e7002..6671c79 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -64,7 +64,6 @@ def test_source_staged(tmpdir, cli, datafiles):
# now check that the source is in the refs file, this is pretty messy but
# seems to be the only way to get the sources?
element = project.load_elements(["import-bin.bst"])[0]
- element._initialize_state()
element._query_source_cache()
source = list(element.sources())[0]
assert element._cached_sources()
@@ -100,7 +99,6 @@ def test_source_fetch(tmpdir, cli, datafiles):
sourcecache = context.sourcecache
element = project.load_elements(["import-dev.bst"])[0]
- element._initialize_state()
element._query_source_cache()
source = list(element.sources())[0]
assert element._cached_sources()
@@ -135,7 +133,6 @@ def test_staged_source_build(tmpdir, datafiles, cli):
project.ensure_fully_loaded()
element = project.load_elements(["import-dev.bst"])[0]
- element._initialize_state()
# check consistency of the source
element._query_source_cache()
[buildstream] 08/12: Move artifact and source cache query to a job
thread
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit ca909d9c41f074858dd4d24d9b3cc226b7a1c3f8
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Mon Dec 14 19:43:40 2020 +0100
Move artifact and source cache query to a job thread
This allows parallelization of cache queries.
---
src/buildstream/_elementsources.py | 5 +-
src/buildstream/_frontend/cli.py | 2 +
src/buildstream/_loader/loader.py | 1 +
src/buildstream/_scheduler/__init__.py | 1 +
.../_scheduler/queues/cachequeryqueue.py | 66 ++++++++++++++++++++++
src/buildstream/_scheduler/queues/fetchqueue.py | 2 +-
src/buildstream/_scheduler/queues/pullqueue.py | 4 --
src/buildstream/_stream.py | 44 +++++++++++++++
src/buildstream/element.py | 10 +---
tests/artifactcache/push.py | 3 +
tests/frontend/fetch.py | 8 +--
tests/frontend/track.py | 12 ++--
tests/sourcecache/fetch.py | 6 ++
tests/sourcecache/push.py | 2 +
tests/sourcecache/staging.py | 3 +
tests/sources/git.py | 6 +-
16 files changed, 151 insertions(+), 24 deletions(-)
diff --git a/src/buildstream/_elementsources.py b/src/buildstream/_elementsources.py
index 9b4afe4..d426ee6 100644
--- a/src/buildstream/_elementsources.py
+++ b/src/buildstream/_elementsources.py
@@ -211,6 +211,9 @@ class ElementSources:
# SourceError: If one of the element sources has an error
#
def fetch(self):
+ if self._cached is None:
+ self.query_cache()
+
if self.cached():
return
@@ -387,8 +390,6 @@ class ElementSources:
unique_key = self.get_unique_key()
self._cache_key = _cachekey.generate_key(unique_key)
- self.query_cache()
-
# preflight():
#
# A internal wrapper for calling the abstract preflight() method on
diff --git a/src/buildstream/_frontend/cli.py b/src/buildstream/_frontend/cli.py
index ab06e8a..cad1ee0 100644
--- a/src/buildstream/_frontend/cli.py
+++ b/src/buildstream/_frontend/cli.py
@@ -551,6 +551,8 @@ def show(app, elements, deps, except_, order, format_):
dependencies = app.stream.load_selection(elements, selection=deps, except_targets=except_)
+ app.stream.query_cache(dependencies)
+
if order == "alpha":
dependencies = sorted(dependencies)
diff --git a/src/buildstream/_loader/loader.py b/src/buildstream/_loader/loader.py
index 6ace362..3d0fb65 100644
--- a/src/buildstream/_loader/loader.py
+++ b/src/buildstream/_loader/loader.py
@@ -830,6 +830,7 @@ class Loader:
# Handle the case where a subproject needs to be fetched
#
+ element._query_source_cache()
if element._should_fetch():
self.load_context.fetch_subprojects([element])
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d2f458f..fcde00d 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -25,6 +25,7 @@ from .queues.trackqueue import TrackQueue
from .queues.buildqueue import BuildQueue
from .queues.artifactpushqueue import ArtifactPushQueue
from .queues.pullqueue import PullQueue
+from .queues.cachequeryqueue import CacheQueryQueue
from .scheduler import Scheduler, SchedStatus
from .jobs import ElementJob, JobStatus
diff --git a/src/buildstream/_scheduler/queues/cachequeryqueue.py b/src/buildstream/_scheduler/queues/cachequeryqueue.py
new file mode 100644
index 0000000..f447ab5
--- /dev/null
+++ b/src/buildstream/_scheduler/queues/cachequeryqueue.py
@@ -0,0 +1,66 @@
+#
+# Copyright (C) 2020 Bloomberg Finance LP
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+
+from . import Queue, QueueStatus
+from ..resources import ResourceType
+from ..jobs import JobStatus
+from ...types import _KeyStrength
+
+
+# A queue which queries the cache for artifacts and sources
+#
+class CacheQueryQueue(Queue):
+
+ action_name = "Cache-query"
+ complete_name = "Cache queried"
+ resources = [ResourceType.PROCESS, ResourceType.CACHE]
+
+ def __init__(self, scheduler, *, sources=False):
+ super().__init__(scheduler)
+
+ self._query_sources = sources
+
+ def get_process_func(self):
+ if not self._query_sources:
+ return CacheQueryQueue._query_artifacts_or_sources
+ else:
+ return CacheQueryQueue._query_sources
+
+ def status(self, element):
+ if not element._get_cache_key(strength=_KeyStrength.WEAK):
+ # Strict and weak cache keys are unavailable if the element or
+ # a dependency has an unresolved source
+ return QueueStatus.SKIP
+
+ return QueueStatus.READY
+
+ def done(self, _, element, result, status):
+ if status is JobStatus.FAIL:
+ return
+
+ if not self._query_sources:
+ if not element._pull_pending():
+ element._load_artifact_done()
+
+ @staticmethod
+ def _query_artifacts_or_sources(element):
+ element._load_artifact(pull=False)
+ if not element._can_query_cache() or not element._cached_success():
+ element._query_source_cache()
+
+ @staticmethod
+ def _query_sources(element):
+ element._query_source_cache()
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index 3a4183d..ee84982 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -56,7 +56,7 @@ class FetchQueue(Queue):
# This will automatically skip elements which
# have no sources.
- if not element._should_fetch(self._should_fetch_original):
+ if element._can_query_source_cache() and not element._should_fetch(self._should_fetch_original):
return QueueStatus.SKIP
return QueueStatus.READY
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index ecff02c..9860256 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -43,10 +43,6 @@ class PullQueue(Queue):
return QueueStatus.SKIP
def done(self, _, element, result, status):
-
- if status is JobStatus.FAIL:
- return
-
element._load_artifact_done()
@staticmethod
diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index e05100f..0558a12 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -37,6 +37,7 @@ from ._scheduler import (
Scheduler,
SchedStatus,
TrackQueue,
+ CacheQueryQueue,
FetchQueue,
SourcePushQueue,
BuildQueue,
@@ -162,6 +163,25 @@ class Stream:
return target_objects
+ # query_cache()
+ #
+ # Query the artifact and source caches to determine the cache status
+ # of the specified elements.
+ #
+ # Args:
+ # elements (list of Element): The elements to check
+ #
+ def query_cache(self, elements, *, sources=False):
+ with self._context.messenger.timed_activity("Query cache", silent_nested=True):
+ # Enqueue complete build plan as this is required to determine `buildable` status.
+ plan = list(self._pipeline.dependencies(elements, _Scope.ALL))
+
+ self._scheduler.clear_queues()
+ self._add_queue(CacheQueryQueue(self._scheduler, sources=sources), track=True)
+ self._enqueue_plan(plan)
+ self._run()
+ self._scheduler.clear_queues()
+
# shell()
#
# Run a shell
@@ -208,6 +228,8 @@ class Stream:
element = self.targets[0]
element._set_required(scope)
+ self.query_cache([element] + elements)
+
if pull_:
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
@@ -244,6 +266,7 @@ class Stream:
# Ensure we have our sources if we are launching a build shell
if scope == _Scope.BUILD and not usebuildtree:
+ self.query_cache([element], sources=True)
self._fetch([element])
self._pipeline.assert_sources_cached([element])
@@ -294,6 +317,8 @@ class Stream:
for element in self.targets:
element._set_artifact_files_required(scope=scope)
+ self.query_cache(elements)
+
# Now construct the queues
#
self._scheduler.clear_queues()
@@ -339,6 +364,8 @@ class Stream:
source_remote_url=remote,
)
+ self.query_cache(elements, sources=True)
+
# Delegated to a shared fetch method
self._fetch(elements, announce_session=True)
@@ -403,6 +430,8 @@ class Stream:
load_artifacts=True,
)
+ self.query_cache(elements, sources=True)
+
if not self._sourcecache.has_push_remotes():
raise StreamError("No source caches available for pushing sources")
@@ -447,6 +476,9 @@ class Stream:
raise StreamError("No artifact caches available for pulling artifacts")
self._pipeline.assert_consistent(elements)
+
+ self.query_cache(elements)
+
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._enqueue_plan(elements)
@@ -489,6 +521,8 @@ class Stream:
self._pipeline.assert_consistent(elements)
+ self.query_cache(elements)
+
self._scheduler.clear_queues()
self._add_queue(PullQueue(self._scheduler))
self._add_queue(ArtifactPushQueue(self._scheduler))
@@ -539,6 +573,8 @@ class Stream:
self._check_location_writable(location, force=force, tar=tar)
+ self.query_cache(elements)
+
uncached_elts = [elt for elt in elements if not elt._cached()]
if uncached_elts and pull:
self._context.messenger.info("Attempting to fetch missing or incomplete artifact")
@@ -617,6 +653,8 @@ class Stream:
targets, selection=selection, use_artifact_config=True, load_artifacts=True
)
+ self.query_cache(target_objects)
+
if self._artifacts.has_fetch_remotes():
self._pipeline.check_remotes(target_objects)
@@ -636,6 +674,8 @@ class Stream:
# Return list of Element and/or ArtifactElement objects
target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
+ self.query_cache(target_objects)
+
artifact_logs = {}
for obj in target_objects:
ref = obj.get_artifact_name()
@@ -664,6 +704,8 @@ class Stream:
# Return list of Element and/or ArtifactElement objects
target_objects = self.load_selection(targets, selection=_PipelineSelection.NONE, load_artifacts=True)
+ self.query_cache(target_objects)
+
elements_to_files = {}
for obj in target_objects:
ref = obj.get_artifact_name()
@@ -742,6 +784,7 @@ class Stream:
elements = self._load((target,), selection=deps, except_targets=except_targets)
# Assert all sources are cached in the source dir
+ self.query_cache(elements, sources=True)
self._fetch(elements)
self._pipeline.assert_sources_cached(elements)
@@ -775,6 +818,7 @@ class Stream:
# If we're going to checkout, we need at least a fetch,
#
if not no_checkout:
+ self.query_cache(elements, sources=True)
self._fetch(elements, fetch_original=True)
expanded_directories = []
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index f5f8a71..7dcbc32 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -1945,6 +1945,9 @@ class Element(Plugin):
self.__artifact = artifact
return pulled
+ def _query_source_cache(self):
+ self.__sources.query_cache()
+
def _skip_source_push(self):
if not self.sources() or self._get_workspace():
return True
@@ -3256,13 +3259,6 @@ class Element(Plugin):
# In strict mode, the strong cache key always matches the strict cache key
self.__cache_key = self.__strict_cache_key
- # If we've newly calculated a cache key, our artifact's
- # current state will also change - after all, we can now find
- # a potential existing artifact.
- self._load_artifact(pull=False)
- if not self._pull_pending():
- self._load_artifact_done()
-
# Update the message kwargs in use for this plugin to dispatch messages with
#
self._message_kwargs["element_key"] = self._get_display_key()
diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py
index 74062ce..17ad2e2 100644
--- a/tests/artifactcache/push.py
+++ b/tests/artifactcache/push.py
@@ -43,6 +43,9 @@ def _push(cli, cache_dir, project_dir, config_file, target):
artifactcache.setup_remotes(use_config=True)
artifactcache.initialize_remotes()
+ # Query local cache
+ element._load_artifact(pull=False)
+
assert artifactcache.has_push_remotes(plugin=element), "No remote configured for element target.bst"
assert element._push(), "Push operation failed"
diff --git a/tests/frontend/fetch.py b/tests/frontend/fetch.py
index b2c9d64..6c8a4b7 100644
--- a/tests/frontend/fetch.py
+++ b/tests/frontend/fetch.py
@@ -62,10 +62,9 @@ def test_fetch_deps(cli, datafiles, deps, expected_states):
def test_fetch_consistency_error(cli, datafiles):
project = str(datafiles)
- # When the error occurs outside of the scheduler at load time,
- # then the SourceError is reported directly as the main error.
result = cli.run(project=project, args=["source", "fetch", "error.bst"])
- result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+ result.assert_main_error(ErrorDomain.STREAM, None)
+ result.assert_task_error(ErrorDomain.SOURCE, "the-consistency-error")
@pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
@@ -73,7 +72,8 @@ def test_fetch_consistency_bug(cli, datafiles):
project = str(datafiles)
result = cli.run(project=project, args=["source", "fetch", "bug.bst"])
- result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+ result.assert_main_error(ErrorDomain.STREAM, None)
+ result.assert_task_error(ErrorDomain.PLUGIN, "source-bug")
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/frontend/track.py b/tests/frontend/track.py
index bd84449..d1a9324 100644
--- a/tests/frontend/track.py
+++ b/tests/frontend/track.py
@@ -248,20 +248,22 @@ def test_track_cross_junction(cli, tmpdir, datafiles, cross_junction, ref_storag
def test_track_consistency_error(cli, datafiles):
project = str(datafiles)
- # Track the element causing a consistency error
+ # Track the element causing a consistency error in `is_cached()`
result = cli.run(project=project, args=["source", "track", "error.bst"])
- result.assert_main_error(ErrorDomain.SOURCE, "the-consistency-error")
+
+ # We expect tracking to succeed as `is_cached()` is not required for tracking.
+ result.assert_success()
@pytest.mark.datafiles(os.path.join(TOP_DIR, "consistencyerror"))
def test_track_consistency_bug(cli, datafiles):
project = str(datafiles)
- # Track the element causing an unhandled exception
+ # Track the element causing an unhandled exception in `is_cached()`
result = cli.run(project=project, args=["source", "track", "bug.bst"])
- # We expect BuildStream to fail gracefully, with no recorded exception.
- result.assert_main_error(ErrorDomain.PLUGIN, "source-bug")
+ # We expect tracking to succeed as `is_cached()` is not required for tracking.
+ result.assert_success()
@pytest.mark.datafiles(DATA_DIR)
diff --git a/tests/sourcecache/fetch.py b/tests/sourcecache/fetch.py
index 76f5508..40076e4 100644
--- a/tests/sourcecache/fetch.py
+++ b/tests/sourcecache/fetch.py
@@ -75,6 +75,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -117,6 +118,7 @@ def test_source_fetch(cli, tmpdir, datafiles):
element._initialize_state()
# check that we have the source in the cas now and it's not fetched
+ element._query_source_cache()
assert element._cached_sources()
assert os.listdir(os.path.join(str(tmpdir), "cache", "sources", "git")) == []
@@ -135,6 +137,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -153,6 +156,7 @@ def test_fetch_fallback(cli, tmpdir, datafiles):
# Check that the source in both in the source dir and the local CAS
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._query_source_cache()
assert element._cached_sources()
@@ -169,6 +173,7 @@ def test_pull_fail(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -201,6 +206,7 @@ def test_source_pull_partial_fallback_fetch(cli, tmpdir, datafiles):
element = project.load_elements([element_name])[0]
element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
diff --git a/tests/sourcecache/push.py b/tests/sourcecache/push.py
index 25a4309..aa703de 100644
--- a/tests/sourcecache/push.py
+++ b/tests/sourcecache/push.py
@@ -85,6 +85,7 @@ def test_source_push_split(cli, tmpdir, datafiles):
element = project.load_elements(["push.bst"])[0]
element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
@@ -135,6 +136,7 @@ def test_source_push(cli, tmpdir, datafiles):
element = project.load_elements(["push.bst"])[0]
element._initialize_state()
+ element._query_source_cache()
assert not element._cached_sources()
source = list(element.sources())[0]
diff --git a/tests/sourcecache/staging.py b/tests/sourcecache/staging.py
index 0f2f058..e0e7002 100644
--- a/tests/sourcecache/staging.py
+++ b/tests/sourcecache/staging.py
@@ -65,6 +65,7 @@ def test_source_staged(tmpdir, cli, datafiles):
# seems to be the only way to get the sources?
element = project.load_elements(["import-bin.bst"])[0]
element._initialize_state()
+ element._query_source_cache()
source = list(element.sources())[0]
assert element._cached_sources()
assert sourcecache.contains(source)
@@ -100,6 +101,7 @@ def test_source_fetch(tmpdir, cli, datafiles):
element = project.load_elements(["import-dev.bst"])[0]
element._initialize_state()
+ element._query_source_cache()
source = list(element.sources())[0]
assert element._cached_sources()
@@ -136,6 +138,7 @@ def test_staged_source_build(tmpdir, datafiles, cli):
element._initialize_state()
# check consistency of the source
+ element._query_source_cache()
assert not element._cached_sources()
res = cli.run(project=project_dir, args=["build", "target.bst"])
diff --git a/tests/sources/git.py b/tests/sources/git.py
index 30657d8..861e70c 100644
--- a/tests/sources/git.py
+++ b/tests/sources/git.py
@@ -401,10 +401,14 @@ def test_submodule_track_ignore_inconsistent(cli, tmpdir, datafiles):
result = cli.run(project=project, args=["source", "fetch", "target.bst"])
result.assert_success()
- # Track will encounter an inconsistent submodule without any ref
+ # Track to update to the offending commit
result = cli.run(project=project, args=["source", "track", "target.bst"])
result.assert_success()
+ # Fetch after track will encounter an inconsistent submodule without any ref
+ result = cli.run(project=project, args=["source", "fetch", "target.bst"])
+ result.assert_success()
+
# Assert that we are just fine without it, and emit a warning to the user.
assert "Ignoring inconsistent submodule" in result.stderr
[buildstream] 11/12: pipe hack
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 9068295c9a73197a5e49b9b83a8648850c0961af
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Tue Dec 15 07:44:12 2020 +0100
pipe hack
---
src/buildstream/_scheduler/jobs/job.py | 19 +++++++++++++++----
1 file changed, 15 insertions(+), 4 deletions(-)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index c7e2624..7579d24 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -145,10 +145,15 @@ class Job:
assert not self._terminated, "Attempted to start process which was already terminated"
# FIXME: remove this, this is not necessary when using asyncio
- self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
+ silence = self.action_name == "Cache-query"
+ if silence:
+ self._pipe_r = pipe_w = None
+ else:
+ self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
self._tries += 1
- self._parent_start_listening()
+ if not silence:
+ self._parent_start_listening()
# FIXME: remove the parent/child separation, it's not needed anymore.
self._child = self.create_child_job( # pylint: disable=assignment-from-no-return
@@ -350,7 +355,8 @@ class Job:
self._scheduler.job_completed(self, status)
# Force the deletion of the pipe and process objects to try and clean up FDs
- self._pipe_r.close()
+ if self._pipe_r:
+ self._pipe_r.close()
self._pipe_r = self._task = None
# _parent_process_pipe()
@@ -359,6 +365,8 @@ class Job:
# in the parent process.
#
def _parent_process_pipe(self):
+ if not self._pipe_r:
+ return
while self._pipe_r.poll():
try:
message = self._pipe_r.recv()
@@ -581,7 +589,8 @@ class ChildJob:
self._thread_id = None
return _ReturnCode.TERMINATED, None
finally:
- self._pipe_w.close()
+ if self._pipe_w:
+ self._pipe_w.close()
# terminate()
#
@@ -620,6 +629,8 @@ class ChildJob:
# is_silenced (bool) : Whether messages are silenced
#
def _child_message_handler(self, message, is_silenced):
+ if self.action_name == "Cache-query":
+ return
message.action_name = self.action_name
message.task_element_name = self._message_element_name
[buildstream] 04/12: _pipeline.py: Drop the optimization for cached
elements in the planner
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit afd578ba876b608cb88b875b688c611bdde5eec4
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Wed Sep 16 15:55:26 2020 +0200
_pipeline.py: Drop the optimization for cached elements in the planner
The overhead of planning already cached elements and unneeded build-only
dependencies should be fairly small as unneeded jobs can still be
skipped. This optimization was also already disabled for non-strict
build plans with a remote artifact cache.
This change is necessary in preparation for parallelizing cache queries.
---
src/buildstream/_pipeline.py | 26 +++++++++-----------------
1 file changed, 9 insertions(+), 17 deletions(-)
diff --git a/src/buildstream/_pipeline.py b/src/buildstream/_pipeline.py
index d53fc9d..7aec985 100644
--- a/src/buildstream/_pipeline.py
+++ b/src/buildstream/_pipeline.py
@@ -141,9 +141,8 @@ class Pipeline:
# plan()
#
# Generator function to iterate over only the elements
- # which are required to build the pipeline target, omitting
- # cached elements. The elements are yielded in a depth sorted
- # ordering for optimal build plans
+ # which are required to build the pipeline target The elements are
+ # yielded in a depth sorted ordering for optimal build plans
#
# Args:
# elements (list of Element): List of target elements to plan
@@ -152,11 +151,7 @@ class Pipeline:
# (list of Element): A depth sorted list of the build plan
#
def plan(self, elements):
- # Keep locally cached elements in the plan if remote artifact cache is used
- # to allow pulling artifact with strict cache key, if available.
- plan_cached = not self._context.get_strict() and self._artifacts.has_fetch_remotes()
-
- return _Planner().plan(elements, plan_cached)
+ return _Planner().plan(elements)
# get_selection()
#
@@ -421,9 +416,8 @@ class Pipeline:
# _Planner()
#
# An internal object used for constructing build plan
-# from a given resolved toplevel element, while considering what
-# parts need to be built depending on build only dependencies
-# being cached, and depth sorting for more efficient processing.
+# from a given resolved toplevel element, using depth
+# sorting for more efficient processing.
#
class _Planner:
def __init__(self):
@@ -447,15 +441,13 @@ class _Planner:
for dep in element._dependencies(_Scope.RUN, recurse=False):
self.plan_element(dep, depth)
- # Dont try to plan builds of elements that are cached already
- if not element._cached_success():
- for dep in element._dependencies(_Scope.BUILD, recurse=False):
- self.plan_element(dep, depth + 1)
+ for dep in element._dependencies(_Scope.BUILD, recurse=False):
+ self.plan_element(dep, depth + 1)
self.depth_map[element] = depth
self.visiting_elements.remove(element)
- def plan(self, roots, plan_cached):
+ def plan(self, roots):
for root in roots:
self.plan_element(root, 0)
@@ -465,4 +457,4 @@ class _Planner:
for index, item in enumerate(depth_sorted):
item[0]._set_depth(index)
- return [item[0] for item in depth_sorted if plan_cached or not item[0]._cached_success()]
+ return [item[0] for item in depth_sorted]
[buildstream] 07/12: element.py: Combine cache query and pull into
`_load_artifact()`
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 515eab747f44f4f19d0598d6b8e7cd9a339d52ca
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Mon Dec 14 21:36:08 2020 +0100
element.py: Combine cache query and pull into `_load_artifact()`
In non-strict mode cache query and pull are intertwined as we prefer
pulling the strict artifact to a cache lookup with the weak cache key.
This replaces `__update_artifact_state()` and `_pull()` with a combined
`_load_artifact()` method, which supports cache query with deferred
pulling. This provides correct behavior with the flexibility of split or
combined cache query and pull.
---
src/buildstream/_artifact.py | 7 +-
src/buildstream/_frontend/widget.py | 2 +-
src/buildstream/_scheduler/queues/pullqueue.py | 13 +-
src/buildstream/element.py | 170 ++++++++++---------------
4 files changed, 72 insertions(+), 120 deletions(-)
diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index b63cff6..da1e03a 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -591,9 +591,12 @@ class Artifact:
# Returns:
# (bool): Whether artifact is in local cache
#
- def cached(self):
+ def cached(self, *, buildtree=False):
assert self._cached is not None
- return self._cached
+ ret = self._cached
+ if buildtree:
+ ret = ret and (self.cached_buildtree() or not self.buildtree_exists())
+ return ret
# cached_logs()
#
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index ad6c813..3ef64d3 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -358,7 +358,7 @@ class LogLine(Widget):
if element.get_kind() == "junction":
line = p.fmt_subst(line, "state", "junction", fg="magenta")
elif not element._can_query_cache():
- line = p.fmt_subst(line, "state", "unknown", fg="bright_black")
+ line = p.fmt_subst(line, "state", "waiting", fg="blue")
elif element._cached_failure():
line = p.fmt_subst(line, "state", "failed", fg="red")
elif element._cached_success():
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index e1d6959..ecff02c 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -37,9 +37,6 @@ class PullQueue(Queue):
return PullQueue._pull_or_skip
def status(self, element):
- if not element._can_query_cache():
- return QueueStatus.PENDING
-
if element._pull_pending():
return QueueStatus.READY
else:
@@ -50,15 +47,9 @@ class PullQueue(Queue):
if status is JobStatus.FAIL:
return
- element._pull_done()
-
- def register_pending_element(self, element):
- # Set a "can_query_cache"_callback for an element which is not
- # immediately ready to query the artifact cache so that it
- # may be pulled.
- element._set_can_query_cache_callback(self._enqueue_element)
+ element._load_artifact_done()
@staticmethod
def _pull_or_skip(element):
- if not element._pull():
+ if not element._load_artifact(pull=True):
raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index c4f0479..f5f8a71 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -290,7 +290,7 @@ class Element(Plugin):
self.__sourcecache = context.sourcecache # Source cache
self.__assemble_scheduled = False # Element is scheduled to be assembled
self.__assemble_done = False # Element is assembled
- self.__pull_done = False # Whether pull was attempted
+ self.__pull_pending = False # Whether pull is pending
self.__cached_successfully = None # If the Element is known to be successfully cached
self.__splits = None # Resolved regex objects for computing split domains
self.__whitelist_regex = None # Resolved regex object to check if file is allowed to overlap
@@ -1332,9 +1332,6 @@ class Element(Plugin):
#
# - __update_cache_keys()
# - Computes the strong and weak cache keys.
- # - _update_artifact_state()
- # - Computes the state of the element's artifact using the
- # cache key.
# - __schedule_assembly_when_necessary()
# - Schedules assembly of an element, iff its current state
# allows/necessitates it
@@ -1869,83 +1866,85 @@ class Element(Plugin):
# (bool): Whether a pull operation is pending
#
def _pull_pending(self):
- if self._get_workspace():
- # Workspace builds are never pushed to artifact servers
- return False
-
- # Check whether the pull has been invoked with a specific subdir requested
- # in user context, as to complete a partial artifact
- pull_buildtrees = self._get_context().pull_buildtrees
-
- if self._cached() and self.__artifact._cache_key == self.__strict_cache_key:
- if pull_buildtrees:
- # If we've specified a subdir, check if the subdir is cached locally
- # or if it's possible to get
- if self._cached_buildtree() or not self._buildtree_exists():
- return False
- else:
- return False
+ return self.__pull_pending
- # Pull is pending if artifact remote server available
- # and pull has not been attempted yet
- return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done
-
- # _pull_done()
+ # _load_artifact_done()
#
- # Indicate that pull was attempted.
+ # Indicate that `_load_artifact()` has completed.
#
- # This needs to be called in the main process after a pull
+ # This needs to be called in the main process after `_load_artifact()`
# succeeds or fails so that we properly update the main
# process data model
#
# This will result in updating the element state.
#
- def _pull_done(self):
+ def _load_artifact_done(self):
assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
- self.__pull_done = True
+ assert self.__artifact
- # Artifact may become cached after pulling, so let it query the
- # filesystem again to check
- self.__artifact.query_cache()
+ context = self._get_context()
+
+ if not context.get_strict() and self.__artifact.cached():
+ # In non-strict mode, strong cache key becomes available when
+ # the artifact is cached
+ self.__update_cache_key_non_strict()
- # We may not have actually pulled an artifact - the pull may
- # have failed. We might therefore need to schedule assembly.
- self.__schedule_assembly_when_necessary()
- # If we've finished pulling, an artifact might now exist
- # locally, so we might need to update a non-strict strong
- # cache key.
- self.__update_cache_key_non_strict()
self._update_ready_for_runtime_and_cached()
- # _pull():
+ self.__schedule_assembly_when_necessary()
+
+ if self.__can_query_cache_callback is not None:
+ self.__can_query_cache_callback(self)
+ self.__can_query_cache_callback = None
+
+ # _load_artifact():
#
- # Pull artifact from remote artifact repository into local artifact cache.
+ # Load artifact from cache or pull it from remote artifact repository.
#
# Returns: True if the artifact has been downloaded, False otherwise
#
- def _pull(self):
+ def _load_artifact(self, *, pull):
context = self._get_context()
- # Get optional specific subdir to pull and optional list to not pull
- # based off of user context
- pull_buildtrees = context.pull_buildtrees
+ pull_buildtrees = context.pull_buildtrees and not self._get_workspace()
- # Attempt to pull artifact without knowing whether it's available
- strict_artifact = Artifact(self, context, strong_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
- if strict_artifact.pull(pull_buildtrees=pull_buildtrees):
- # Notify successful download
- return True
+ # First check whether we already have the strict artifact in the local cache
+ artifact = Artifact(
+ self,
+ context,
+ strict_key=self.__strict_cache_key,
+ strong_key=self.__strict_cache_key,
+ weak_key=self.__weak_cache_key,
+ )
+ artifact.query_cache()
- if not context.get_strict() and not self._cached():
- # In non-strict mode also try pulling weak artifact
- # if no weak artifact is cached yet.
- artifact = Artifact(self, context, weak_key=self.__weak_cache_key)
- return artifact.pull(pull_buildtrees=pull_buildtrees)
- else:
- # No artifact has been downloaded
+ self.__pull_pending = False
+ if not pull and not artifact.cached(buildtree=pull_buildtrees):
+ if self.__artifacts.has_fetch_remotes(plugin=self) and not self._get_workspace():
+ # Artifact is not completely available in cache and artifact remote server is available.
+ # Stop artifact loading here as pull is required to proceed.
+ self.__pull_pending = True
+
+ # Attempt to pull artifact with the strict cache key
+ pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+ if artifact.cached() or context.get_strict():
+ self.__artifact = artifact
+ return pulled
+ elif self.__pull_pending:
return False
+ # In non-strict mode retry with weak cache key
+ artifact = Artifact(self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
+ artifact.query_cache()
+
+ # Attempt to pull artifact with the weak cache key
+ pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+ self.__artifact = artifact
+ return pulled
+
def _skip_source_push(self):
if not self.sources() or self._get_workspace():
return True
@@ -2384,7 +2383,7 @@ class Element(Plugin):
assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
if not self.__ready_for_runtime_and_cached:
- if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success():
+ if self.__runtime_deps_uncached == 0 and self.__artifact and self.__cache_key and self._cached_success():
self.__ready_for_runtime_and_cached = True
# Notify reverse dependencies
@@ -2871,10 +2870,13 @@ class Element(Plugin):
# Load bits which have been stored on the artifact
#
+ artifact.query_cache()
if artifact.cached():
self.__environment = artifact.load_environment()
self.__sandbox_config = artifact.load_sandbox_config()
self.__variables = artifact.load_variables()
+ else:
+ self.__pull_pending = True
self.__cache_key = artifact.strong_key
self.__strict_cache_key = artifact.strict_key
@@ -3257,58 +3259,14 @@ class Element(Plugin):
# If we've newly calculated a cache key, our artifact's
# current state will also change - after all, we can now find
# a potential existing artifact.
- self.__update_artifact_state()
+ self._load_artifact(pull=False)
+ if not self._pull_pending():
+ self._load_artifact_done()
# Update the message kwargs in use for this plugin to dispatch messages with
#
self._message_kwargs["element_key"] = self._get_display_key()
- # __update_artifact_state()
- #
- # Updates the data involved in knowing about the artifact corresponding
- # to this element.
- #
- # If the state changes, this will subsequently call
- # `self.__schedule_assembly_when_necessary()` to schedule assembly if it becomes
- # possible.
- #
- # Element.__update_cache_keys() must be called before this to have
- # meaningful results, because the element must know its cache key before
- # it can check whether an artifact exists for that cache key.
- #
- def __update_artifact_state(self):
- assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
- assert self.__artifact is None
-
- context = self._get_context()
-
- strict_artifact = Artifact(
- self,
- context,
- strong_key=self.__strict_cache_key,
- strict_key=self.__strict_cache_key,
- weak_key=self.__weak_cache_key,
- )
- strict_artifact.query_cache()
- if context.get_strict() or strict_artifact.cached():
- self.__artifact = strict_artifact
- else:
- self.__artifact = Artifact(
- self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key
- )
- self.__artifact.query_cache()
-
- if not context.get_strict() and self.__artifact.cached():
- # In non-strict mode, strong cache key becomes available when
- # the artifact is cached
- self.__update_cache_key_non_strict()
-
- self.__schedule_assembly_when_necessary()
-
- if self.__can_query_cache_callback is not None:
- self.__can_query_cache_callback(self)
- self.__can_query_cache_callback = None
-
# __update_cache_key_non_strict()
#
# Calculates the strong cache key if it hasn't already been set.
@@ -3317,7 +3275,7 @@ class Element(Plugin):
# strict cache key, so no work needs to be done.
#
# When buildstream is not run in strict mode, this requires the artifact
- # state (as set in Element.__update_artifact_state()) to be set accordingly,
+ # state (as set in Element._load_artifact()) to be set accordingly,
# as the cache key can be loaded from the cache (possibly pulling from
# a remote cache).
#
[buildstream] 01/12: tests/frontend/push.py: Allow pushing of
dependencies
Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 847703c48ad311a74d0028399b52fbd4fff90b56
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Mon Sep 14 11:24:59 2020 +0200
tests/frontend/push.py: Allow pushing of dependencies
The assertions in `test_push_after_pull` are too strict. Pushing
dependencies to the second (empty) artifact server should not cause a
test failure.
---
tests/frontend/push.py | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/tests/frontend/push.py b/tests/frontend/push.py
index 4e39c22..2106b6d 100644
--- a/tests/frontend/push.py
+++ b/tests/frontend/push.py
@@ -372,8 +372,8 @@ def test_push_after_pull(cli, tmpdir, datafiles):
#
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- assert result.get_pulled_elements() == ["target.bst"]
- assert result.get_pushed_elements() == []
+ assert "target.bst" in result.get_pulled_elements()
+ assert "target.bst" not in result.get_pushed_elements()
# Delete the artifact locally again.
cli.remove_artifact_from_cache(project, "target.bst")
@@ -383,8 +383,8 @@ def test_push_after_pull(cli, tmpdir, datafiles):
cli.configure({"artifacts": [{"url": share1.repo, "push": True}, {"url": share2.repo, "push": True},]})
result = cli.run(project=project, args=["build", "target.bst"])
result.assert_success()
- assert result.get_pulled_elements() == ["target.bst"]
- assert result.get_pushed_elements() == ["target.bst"]
+ assert "target.bst" in result.get_pulled_elements()
+ assert "target.bst" in result.get_pushed_elements()
# Ensure that when an artifact's size exceeds available disk space