You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 07:43:53 UTC

[buildstream] 07/12: element.py: Combine cache query and pull into `_load_artifact()`

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch juerg/cache-query-job-benchmark
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 515eab747f44f4f19d0598d6b8e7cd9a339d52ca
Author: Jürg Billeter <j...@bitron.ch>
AuthorDate: Mon Dec 14 21:36:08 2020 +0100

    element.py: Combine cache query and pull into `_load_artifact()`
    
    In non-strict mode cache query and pull are intertwined as we prefer
    pulling the strict artifact to a cache lookup with the weak cache key.
    
    This replaces `__update_artifact_state()` and `_pull()` with a combined
    `_load_artifact()` method, which supports cache query with deferred
    pulling. This provides correct behavior with the flexibility of split or
    combined cache query and pull.
---
 src/buildstream/_artifact.py                   |   7 +-
 src/buildstream/_frontend/widget.py            |   2 +-
 src/buildstream/_scheduler/queues/pullqueue.py |  13 +-
 src/buildstream/element.py                     | 170 ++++++++++---------------
 4 files changed, 72 insertions(+), 120 deletions(-)

diff --git a/src/buildstream/_artifact.py b/src/buildstream/_artifact.py
index b63cff6..da1e03a 100644
--- a/src/buildstream/_artifact.py
+++ b/src/buildstream/_artifact.py
@@ -591,9 +591,12 @@ class Artifact:
     # Returns:
     #     (bool): Whether artifact is in local cache
     #
-    def cached(self):
+    def cached(self, *, buildtree=False):
         assert self._cached is not None
-        return self._cached
+        ret = self._cached
+        if buildtree:
+            ret = ret and (self.cached_buildtree() or not self.buildtree_exists())
+        return ret
 
     # cached_logs()
     #
diff --git a/src/buildstream/_frontend/widget.py b/src/buildstream/_frontend/widget.py
index ad6c813..3ef64d3 100644
--- a/src/buildstream/_frontend/widget.py
+++ b/src/buildstream/_frontend/widget.py
@@ -358,7 +358,7 @@ class LogLine(Widget):
                     if element.get_kind() == "junction":
                         line = p.fmt_subst(line, "state", "junction", fg="magenta")
                     elif not element._can_query_cache():
-                        line = p.fmt_subst(line, "state", "unknown", fg="bright_black")
+                        line = p.fmt_subst(line, "state", "waiting", fg="blue")
                     elif element._cached_failure():
                         line = p.fmt_subst(line, "state", "failed", fg="red")
                     elif element._cached_success():
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index e1d6959..ecff02c 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -37,9 +37,6 @@ class PullQueue(Queue):
         return PullQueue._pull_or_skip
 
     def status(self, element):
-        if not element._can_query_cache():
-            return QueueStatus.PENDING
-
         if element._pull_pending():
             return QueueStatus.READY
         else:
@@ -50,15 +47,9 @@ class PullQueue(Queue):
         if status is JobStatus.FAIL:
             return
 
-        element._pull_done()
-
-    def register_pending_element(self, element):
-        # Set a "can_query_cache"_callback for an element which is not
-        # immediately ready to query the artifact cache so that it
-        # may be pulled.
-        element._set_can_query_cache_callback(self._enqueue_element)
+        element._load_artifact_done()
 
     @staticmethod
     def _pull_or_skip(element):
-        if not element._pull():
+        if not element._load_artifact(pull=True):
             raise SkipJob(PullQueue.action_name)
diff --git a/src/buildstream/element.py b/src/buildstream/element.py
index c4f0479..f5f8a71 100644
--- a/src/buildstream/element.py
+++ b/src/buildstream/element.py
@@ -290,7 +290,7 @@ class Element(Plugin):
         self.__sourcecache = context.sourcecache  # Source cache
         self.__assemble_scheduled = False  # Element is scheduled to be assembled
         self.__assemble_done = False  # Element is assembled
-        self.__pull_done = False  # Whether pull was attempted
+        self.__pull_pending = False  # Whether pull is pending
         self.__cached_successfully = None  # If the Element is known to be successfully cached
         self.__splits = None  # Resolved regex objects for computing split domains
         self.__whitelist_regex = None  # Resolved regex object to check if file is allowed to overlap
@@ -1332,9 +1332,6 @@ class Element(Plugin):
     #
     # - __update_cache_keys()
     #   - Computes the strong and weak cache keys.
-    # - _update_artifact_state()
-    #   - Computes the state of the element's artifact using the
-    #     cache key.
     # - __schedule_assembly_when_necessary()
     #   - Schedules assembly of an element, iff its current state
     #     allows/necessitates it
@@ -1869,83 +1866,85 @@ class Element(Plugin):
     #   (bool): Whether a pull operation is pending
     #
     def _pull_pending(self):
-        if self._get_workspace():
-            # Workspace builds are never pushed to artifact servers
-            return False
-
-        # Check whether the pull has been invoked with a specific subdir requested
-        # in user context, as to complete a partial artifact
-        pull_buildtrees = self._get_context().pull_buildtrees
-
-        if self._cached() and self.__artifact._cache_key == self.__strict_cache_key:
-            if pull_buildtrees:
-                # If we've specified a subdir, check if the subdir is cached locally
-                # or if it's possible to get
-                if self._cached_buildtree() or not self._buildtree_exists():
-                    return False
-            else:
-                return False
+        return self.__pull_pending
 
-        # Pull is pending if artifact remote server available
-        # and pull has not been attempted yet
-        return self.__artifacts.has_fetch_remotes(plugin=self) and not self.__pull_done
-
-    # _pull_done()
+    # _load_artifact_done()
     #
-    # Indicate that pull was attempted.
+    # Indicate that `_load_artifact()` has completed.
     #
-    # This needs to be called in the main process after a pull
+    # This needs to be called in the main process after `_load_artifact()`
     # succeeds or fails so that we properly update the main
     # process data model
     #
     # This will result in updating the element state.
     #
-    def _pull_done(self):
+    def _load_artifact_done(self):
         assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
 
-        self.__pull_done = True
+        assert self.__artifact
 
-        # Artifact may become cached after pulling, so let it query the
-        # filesystem again to check
-        self.__artifact.query_cache()
+        context = self._get_context()
+
+        if not context.get_strict() and self.__artifact.cached():
+            # In non-strict mode, strong cache key becomes available when
+            # the artifact is cached
+            self.__update_cache_key_non_strict()
 
-        # We may not have actually pulled an artifact - the pull may
-        # have failed. We might therefore need to schedule assembly.
-        self.__schedule_assembly_when_necessary()
-        # If we've finished pulling, an artifact might now exist
-        # locally, so we might need to update a non-strict strong
-        # cache key.
-        self.__update_cache_key_non_strict()
         self._update_ready_for_runtime_and_cached()
 
-    # _pull():
+        self.__schedule_assembly_when_necessary()
+
+        if self.__can_query_cache_callback is not None:
+            self.__can_query_cache_callback(self)
+            self.__can_query_cache_callback = None
+
+    # _load_artifact():
     #
-    # Pull artifact from remote artifact repository into local artifact cache.
+    # Load artifact from cache or pull it from remote artifact repository.
     #
     # Returns: True if the artifact has been downloaded, False otherwise
     #
-    def _pull(self):
+    def _load_artifact(self, *, pull):
         context = self._get_context()
 
-        # Get optional specific subdir to pull and optional list to not pull
-        # based off of user context
-        pull_buildtrees = context.pull_buildtrees
+        pull_buildtrees = context.pull_buildtrees and not self._get_workspace()
 
-        # Attempt to pull artifact without knowing whether it's available
-        strict_artifact = Artifact(self, context, strong_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
-        if strict_artifact.pull(pull_buildtrees=pull_buildtrees):
-            # Notify successful download
-            return True
+        # First check whether we already have the strict artifact in the local cache
+        artifact = Artifact(
+            self,
+            context,
+            strict_key=self.__strict_cache_key,
+            strong_key=self.__strict_cache_key,
+            weak_key=self.__weak_cache_key,
+        )
+        artifact.query_cache()
 
-        if not context.get_strict() and not self._cached():
-            # In non-strict mode also try pulling weak artifact
-            # if no weak artifact is cached yet.
-            artifact = Artifact(self, context, weak_key=self.__weak_cache_key)
-            return artifact.pull(pull_buildtrees=pull_buildtrees)
-        else:
-            # No artifact has been downloaded
+        self.__pull_pending = False
+        if not pull and not artifact.cached(buildtree=pull_buildtrees):
+            if self.__artifacts.has_fetch_remotes(plugin=self) and not self._get_workspace():
+                # Artifact is not completely available in cache and artifact remote server is available.
+                # Stop artifact loading here as pull is required to proceed.
+                self.__pull_pending = True
+
+        # Attempt to pull artifact with the strict cache key
+        pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+        if artifact.cached() or context.get_strict():
+            self.__artifact = artifact
+            return pulled
+        elif self.__pull_pending:
             return False
 
+        # In non-strict mode retry with weak cache key
+        artifact = Artifact(self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key)
+        artifact.query_cache()
+
+        # Attempt to pull artifact with the weak cache key
+        pulled = pull and artifact.pull(pull_buildtrees=pull_buildtrees)
+
+        self.__artifact = artifact
+        return pulled
+
     def _skip_source_push(self):
         if not self.sources() or self._get_workspace():
             return True
@@ -2384,7 +2383,7 @@ class Element(Plugin):
         assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
 
         if not self.__ready_for_runtime_and_cached:
-            if self.__runtime_deps_uncached == 0 and self.__cache_key and self._cached_success():
+            if self.__runtime_deps_uncached == 0 and self.__artifact and self.__cache_key and self._cached_success():
                 self.__ready_for_runtime_and_cached = True
 
                 # Notify reverse dependencies
@@ -2871,10 +2870,13 @@ class Element(Plugin):
 
         # Load bits which have been stored on the artifact
         #
+        artifact.query_cache()
         if artifact.cached():
             self.__environment = artifact.load_environment()
             self.__sandbox_config = artifact.load_sandbox_config()
             self.__variables = artifact.load_variables()
+        else:
+            self.__pull_pending = True
 
         self.__cache_key = artifact.strong_key
         self.__strict_cache_key = artifact.strict_key
@@ -3257,58 +3259,14 @@ class Element(Plugin):
         # If we've newly calculated a cache key, our artifact's
         # current state will also change - after all, we can now find
         # a potential existing artifact.
-        self.__update_artifact_state()
+        self._load_artifact(pull=False)
+        if not self._pull_pending():
+            self._load_artifact_done()
 
         # Update the message kwargs in use for this plugin to dispatch messages with
         #
         self._message_kwargs["element_key"] = self._get_display_key()
 
-    # __update_artifact_state()
-    #
-    # Updates the data involved in knowing about the artifact corresponding
-    # to this element.
-    #
-    # If the state changes, this will subsequently call
-    # `self.__schedule_assembly_when_necessary()` to schedule assembly if it becomes
-    # possible.
-    #
-    # Element.__update_cache_keys() must be called before this to have
-    # meaningful results, because the element must know its cache key before
-    # it can check whether an artifact exists for that cache key.
-    #
-    def __update_artifact_state(self):
-        assert utils._is_in_main_thread(), "This has an impact on all elements and must be run in the main thread"
-        assert self.__artifact is None
-
-        context = self._get_context()
-
-        strict_artifact = Artifact(
-            self,
-            context,
-            strong_key=self.__strict_cache_key,
-            strict_key=self.__strict_cache_key,
-            weak_key=self.__weak_cache_key,
-        )
-        strict_artifact.query_cache()
-        if context.get_strict() or strict_artifact.cached():
-            self.__artifact = strict_artifact
-        else:
-            self.__artifact = Artifact(
-                self, context, strict_key=self.__strict_cache_key, weak_key=self.__weak_cache_key
-            )
-            self.__artifact.query_cache()
-
-        if not context.get_strict() and self.__artifact.cached():
-            # In non-strict mode, strong cache key becomes available when
-            # the artifact is cached
-            self.__update_cache_key_non_strict()
-
-        self.__schedule_assembly_when_necessary()
-
-        if self.__can_query_cache_callback is not None:
-            self.__can_query_cache_callback(self)
-            self.__can_query_cache_callback = None
-
     # __update_cache_key_non_strict()
     #
     # Calculates the strong cache key if it hasn't already been set.
@@ -3317,7 +3275,7 @@ class Element(Plugin):
     # strict cache key, so no work needs to be done.
     #
     # When buildstream is not run in strict mode, this requires the artifact
-    # state (as set in Element.__update_artifact_state()) to be set accordingly,
+    # state (as set in Element._load_artifact()) to be set accordingly,
     # as the cache key can be loaded from the cache (possibly pulling from
     # a remote cache).
     #