You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:56:13 UTC

[buildstream] 04/04: buildstream/_scheduler/*.py: Make job submission a queue job

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch 372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 92a6e3464fdb29181556c2f12d0cc6cc99774ff4
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Thu Apr 26 15:48:29 2018 +0100

    buildstream/_scheduler/*.py: Make job submission a queue job
---
 buildstream/_scheduler/jobs/job.py     |  6 ++-
 buildstream/_scheduler/queues/queue.py | 23 ++++-----
 buildstream/_scheduler/scheduler.py    | 92 ++++++++++++++++------------------
 3 files changed, 58 insertions(+), 63 deletions(-)

diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 6728ae0..9335cf9 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -67,13 +67,14 @@ class Process(multiprocessing.Process):
 #
 class Job():
 
-    def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
+    def __init__(self, scheduler, job_type, action_name, logfile, *, max_retries=0):
 
         #
         # Public members
         #
         self.action_name = action_name   # The action name for the Queue
-        self.child_data = None
+        self.child_data = None           # Data to be sent to the main process
+        self.job_type = job_type         # The type of the job
 
         #
         # Private members
@@ -540,6 +541,7 @@ class Job():
             return
 
         self._parent_complete(returncode == 0, self._result)
+        self._scheduler.job_completed(self)
 
     # _parent_process_envelope()
     #
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 648bebb..434ca87 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -78,7 +78,6 @@ class Queue():
         #
         # Public members
         #
-        self.active_jobs = []          # List of active ongoing Jobs, for scheduler observation
         self.failed_elements = []      # List of failed elements, for the frontend
         self.processed_elements = []   # List of processed elements, for the frontend
         self.skipped_elements = []     # List of skipped elements, for the frontend
@@ -224,6 +223,7 @@ class Queue():
     def process_ready(self):
         scheduler = self._scheduler
         unready = []
+        ready = []
 
         while self._wait_queue and scheduler.get_job_token(self.queue_type):
             element = self._wait_queue.popleft()
@@ -242,20 +242,24 @@ class Queue():
             logfile = self._element_log_path(element)
             self.prepare(element)
 
-            job = ElementJob(scheduler, self.action_name,
+            job = ElementJob(scheduler, self.queue_type,
+                             self.action_name,
                              logfile, element=element,
                              action_cb=self.process,
                              complete_cb=self._job_done,
                              max_retries=self._max_retries)
-            scheduler.job_starting(job, element)
+            ready.append(job)
 
-            job.spawn()
-            self.active_jobs.append(job)
+            # Notify the frontend
+            if self._scheduler._job_start_callback:
+                self._scheduler._job_start_callback(element, self.action_name)
 
         # These were not ready but were in the beginning, give em
         # first priority again next time around
         self._wait_queue.extendleft(unready)
 
+        return ready
+
     #####################################################
     #                 Private Methods                   #
     #####################################################
@@ -299,9 +303,6 @@ class Queue():
     #
     def _job_done(self, job, element, success, result):
 
-        # Remove from our jobs
-        self.active_jobs.remove(job)
-
         # Update workspaces in the main task before calling any queue implementation
         self._update_workspaces(element, job)
 
@@ -347,13 +348,11 @@ class Queue():
                 self.failed_elements.append(element)
 
         # Give the token for this job back to the scheduler
-        # immediately before invoking another round of scheduling
         self._scheduler.put_job_token(self.queue_type)
 
         # Notify frontend
-        self._scheduler.job_completed(self, job, element, success)
-
-        self._scheduler.sched()
+        if self._scheduler._job_complete_callback:
+            self._scheduler._job_complete_callback(element, self, job.action_name, success)
 
     # Convenience wrapper for Queue implementations to send
     # a message for the element they are processing
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 61cfc11..8187c7a 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -71,8 +71,10 @@ class Scheduler():
         #
         # Public members
         #
-        self.queues = None          # Exposed for the frontend to print summaries
+        self.active_jobs = []       # Jobs currently being run in the scheduler
         self.context = context      # The Context object shared with Queues
+        self.queues = None
+        self.queue_runner = None    # The QueueRunner that delivers jobs to schedule
         self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
         self.suspended = False      # Whether the scheduler is currently suspended
 
@@ -83,7 +85,6 @@ class Scheduler():
         #
         # Private members
         #
-        self._runners = []
         self._interrupt_callback = interrupt_callback
         self._ticker_callback = ticker_callback
         self._job_start_callback = job_start_callback
@@ -116,8 +117,8 @@ class Scheduler():
     #
     def run(self, queues):
 
+        self.queue_runner = QueueRunner(self, queues)
         # Hold on to the queues to process
-        self._runners.append(QueueRunner(self, queues))
         self.queues = queues
 
         # Ensure that we have a fresh new event loop, in case we want
@@ -224,17 +225,38 @@ class Scheduler():
     # and process anything that is ready.
     #
     def sched(self):
-        for runner in self._runners:
-            runner.schedule_jobs()
+        jobs = self.queue_runner.schedule_jobs()
+        self.run_jobs(jobs)
 
         # If nothings ticking, time to bail out
-        ticking = 0
-        for queue in self.queues:
-            ticking += len(queue.active_jobs)
-
-        if ticking == 0:
+        if not self.active_jobs:
             self.loop.stop()
 
+    # run_jobs():
+    #
+    # Execute jobs and track them.
+    #
+    # Args:
+    #    jobs (typing.Iterable[jobs]) - A set of jobs to run
+    #
+    def run_jobs(self, jobs):
+        for job in jobs:
+            job.spawn()
+            self.active_jobs.append(job)
+
+    # job_completed():
+    #
+    # Called when a Job completes
+    #
+    # Args:
+    #    queue (Queue): The Queue holding a complete job
+    #    job (Job): The completed Job
+    #    success (bool): Whether the Job completed with a success status
+    #
+    def job_completed(self, job):
+        self.active_jobs.remove(job)
+        self.sched()
+
     # get_job_token():
     #
     # Used by the Queue object to obtain a token for
@@ -266,30 +288,6 @@ class Scheduler():
     def put_job_token(self, queue_type):
         self._job_tokens[queue_type] += 1
 
-    # job_starting():
-    #
-    # Called by the Queue when starting a Job
-    #
-    # Args:
-    #    job (Job): The starting Job
-    #
-    def job_starting(self, job, element):
-        if self._job_start_callback:
-            self._job_start_callback(element, job.action_name)
-
-    # job_completed():
-    #
-    # Called by the Queue when a Job completes
-    #
-    # Args:
-    #    queue (Queue): The Queue holding a complete job
-    #    job (Job): The completed Job
-    #    success (bool): Whether the Job completed with a success status
-    #
-    def job_completed(self, queue, job, element, success):
-        if self._job_complete_callback:
-            self._job_complete_callback(element, queue, job.action_name, success)
-
     #######################################################
     #                  Local Private Methods              #
     #######################################################
@@ -302,9 +300,8 @@ class Scheduler():
         if not self.suspended:
             self._suspendtime = datetime.datetime.now()
             self.suspended = True
-            for queue in self.queues:
-                for job in queue.active_jobs:
-                    job.suspend()
+            for job in self.active_jobs:
+                job.suspend()
 
     # _resume_jobs()
     #
@@ -312,9 +309,8 @@ class Scheduler():
     #
     def _resume_jobs(self):
         if self.suspended:
-            for queue in self.queues:
-                for job in queue.active_jobs:
-                    job.resume()
+            for job in self.active_jobs:
+                job.resume()
             self.suspended = False
             self._starttime += (datetime.datetime.now() - self._suspendtime)
             self._suspendtime = None
@@ -377,17 +373,15 @@ class Scheduler():
         wait_limit = 20.0
 
         # First tell all jobs to terminate
-        for queue in self.queues:
-            for job in queue.active_jobs:
-                job.terminate()
+        for job in self.active_jobs:
+            job.terminate()
 
         # Now wait for them to really terminate
-        for queue in self.queues:
-            for job in queue.active_jobs:
-                elapsed = datetime.datetime.now() - wait_start
-                timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
-                if not job.terminate_wait(timeout):
-                    job.kill()
+        for job in self.active_jobs:
+            elapsed = datetime.datetime.now() - wait_start
+            timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
+            if not job.terminate_wait(timeout):
+                job.kill()
 
         self.loop.stop()