You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2022/08/19 08:11:15 UTC

[buildstream] branch tristan/quit-only-imperative-jobs created (now dcea537bd)

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a change to branch tristan/quit-only-imperative-jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git


      at dcea537bd tests/integration/cachedfail.py: Remove xfail statement about #534

This branch includes the following new commits:

     new 58564f258 _scheduler: Add concept of imperative queues
     new edda1d9dd _stream.py: Mark the imperative queues in tasks with multiple queues
     new dcea537bd tests/integration/cachedfail.py: Remove xfail statement about #534

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 02/03: _stream.py: Mark the imperative queues in tasks with multiple queues

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tristan/quit-only-imperative-jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit edda1d9ddb66a0b8d34b741e8faeb60e50af263d
Author: Tristan van Berkom <tr...@codethink.co.uk>
AuthorDate: Fri Aug 19 17:09:58 2022 +0900

    _stream.py: Mark the imperative queues in tasks with multiple queues
---
 src/buildstream/_stream.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py
index 428133839..5469b860f 100644
--- a/src/buildstream/_stream.py
+++ b/src/buildstream/_stream.py
@@ -423,7 +423,7 @@ class Stream:
 
         self._add_queue(FetchQueue(self._scheduler, skip_cached=True))
 
-        self._add_queue(BuildQueue(self._scheduler))
+        self._add_queue(BuildQueue(self._scheduler, imperative=True))
 
         if self._artifacts.has_push_remotes():
             self._add_queue(ArtifactPushQueue(self._scheduler, skip_uncached=True))
@@ -553,7 +553,7 @@ class Stream:
 
         self._add_queue(FetchQueue(self._scheduler))
 
-        self._add_queue(SourcePushQueue(self._scheduler))
+        self._add_queue(SourcePushQueue(self._scheduler, imperative=True))
 
         self._enqueue_plan(elements)
         self._run(announce_session=True)
@@ -652,7 +652,7 @@ class Stream:
 
         self._reset()
         self._add_queue(PullQueue(self._scheduler))
-        self._add_queue(ArtifactPushQueue(self._scheduler))
+        self._add_queue(ArtifactPushQueue(self._scheduler, imperative=True))
         self._enqueue_plan(elements)
         self._run(announce_session=True)
 


[buildstream] 03/03: tests/integration/cachedfail.py: Remove xfail statement about #534

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tristan/quit-only-imperative-jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit dcea537bd0a9f76102e933824579c99e2cc86a3c
Author: Tristan van Berkom <tr...@codethink.co.uk>
AuthorDate: Fri Aug 19 17:10:34 2022 +0900

    tests/integration/cachedfail.py: Remove xfail statement about #534
---
 tests/integration/cachedfail.py | 3 ---
 1 file changed, 3 deletions(-)

diff --git a/tests/integration/cachedfail.py b/tests/integration/cachedfail.py
index da1fff6c2..7711362d5 100644
--- a/tests/integration/cachedfail.py
+++ b/tests/integration/cachedfail.py
@@ -138,9 +138,6 @@ def test_build_depend_on_cached_fail(cli, datafiles):
 @pytest.mark.datafiles(DATA_DIR)
 @pytest.mark.parametrize("on_error", ("continue", "quit"))
 def test_push_cached_fail(cli, tmpdir, datafiles, on_error):
-    if on_error == "quit":
-        pytest.xfail("https://gitlab.com/BuildStream/buildstream/issues/534")
-
     project = str(datafiles)
     element_path = os.path.join(project, "elements", "element.bst")
 


[buildstream] 01/03: _scheduler: Add concept of imperative queues

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch tristan/quit-only-imperative-jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 58564f258f0a2f1cf8b2f20b8ada2748c8b7ba44
Author: Tristan van Berkom <tr...@codethink.co.uk>
AuthorDate: Fri Aug 19 17:07:55 2022 +0900

    _scheduler: Add concept of imperative queues
    
    Impacts the behavior of quitting the scheduler (as opposed to terminating it).
    
    If Scheduler.stop() is called, then we will stop processing jobs from
    the imperative queue or any preceding queues and only continue to process
    post-imperative queues.
    
    Related to #534
---
 src/buildstream/_scheduler/queues/queue.py |  7 ++++++-
 src/buildstream/_scheduler/scheduler.py    | 28 ++++++++++++++++++++++------
 2 files changed, 28 insertions(+), 7 deletions(-)

diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 76688e75d..e9f934d57 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -64,7 +64,7 @@ class Queue:
     # Resources this queues' jobs want
     resources = []  # type: List[int]
 
-    def __init__(self, scheduler):
+    def __init__(self, scheduler, *, imperative=False):
 
         #
         # Private members
@@ -78,6 +78,11 @@ class Queue:
 
         self._required_element_check = False  # Whether we should check that elements are required before enqueuing
 
+        #
+        # Public members
+        #
+        self.imperative = imperative
+
         # Assert the subclass has setup class data
         assert self.action_name is not None
         assert self.complete_name is not None
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 9d8e1c0f3..bba7de703 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -266,8 +266,8 @@ class Scheduler:
 
     # stop()
     #
-    # Stop queueing additional jobs, causes Scheduler.run()
-    # to return once all currently processing jobs are finished.
+    # Stop queueing additional imperative jobs, causes Scheduler.run()
+    # to return once all post-imperative jobs are finished.
     #
     def stop(self):
         self._queue_jobs = False
@@ -354,11 +354,27 @@ class Scheduler:
         ready = []
         process_queues = True
 
-        while self._queue_jobs and process_queues:
+        if self._queue_jobs:
+            # Scheduler.stop() was not called, consider all queues.
+            queues = self.queues
+        else:
+            # Limit processing to post-imperative queues
+            for queue in self.queues:
+                if queue.imperative:
+                    # Here the `queues` list will consists of the imperative queue along with
+                    # any subsequent queues, this means elements will be carried only from the
+                    # imerative queue onwards, and only post-imperative jobs will be processed.
+                    queues = self.queues[self.queues.index(queue) :]
+                    break
+            else:
+                # No imperative queue was marked, stop queueing any jobs
+                queues = []
+
+        while process_queues:
 
             # Pull elements forward through queues
             elements = []
-            for queue in self.queues:
+            for queue in queues:
                 queue.enqueue(elements)
                 elements = list(queue.dequeue())
 
@@ -373,13 +389,13 @@ class Scheduler:
             # to fetch tasks for elements which failed to pull, and
             # thus need all the pulls to complete before ever starting
             # a build
-            ready.extend(chain.from_iterable(q.harvest_jobs() for q in reversed(self.queues)))
+            ready.extend(chain.from_iterable(q.harvest_jobs() for q in reversed(queues)))
 
             # harvest_jobs() may have decided to skip some jobs, making
             # them eligible for promotion to the next queue as a side effect.
             #
             # If that happens, do another round.
-            process_queues = any(q.dequeue_ready() for q in self.queues)
+            process_queues = any(q.dequeue_ready() for q in queues)
 
         # Start the jobs
         #