You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:30:01 UTC

[buildstream] 03/32: buildstream/_scheduler/*.py: Make job submission a queue job

This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch testing/local-cache-expiry
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit bc4cc296380f165da8eeb6d335ef6cb5b1f4f106
Author: Tristan Maat <tm...@tlater.net>
AuthorDate: Sun Jun 24 22:14:49 2018 +0000

    buildstream/_scheduler/*.py: Make job submission a queue job
---
 buildstream/_scheduler/jobs/job.py          |  15 ++-
 buildstream/_scheduler/queues/buildqueue.py |   3 +
 buildstream/_scheduler/queues/fetchqueue.py |   2 +
 buildstream/_scheduler/queues/pullqueue.py  |   2 +
 buildstream/_scheduler/queues/pushqueue.py  |   2 +
 buildstream/_scheduler/queues/queue.py      |  14 +--
 buildstream/_scheduler/queues/trackqueue.py |   2 +
 buildstream/_scheduler/scheduler.py         | 179 ++++++++++++++++++++--------
 8 files changed, 154 insertions(+), 65 deletions(-)

diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index c567b6f..84be452 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -52,6 +52,14 @@ class Process(multiprocessing.Process):
         self._sentinel = self._popen.sentinel
 
 
+class JobType():
+    FETCH = 1
+    TRACK = 2
+    BUILD = 3
+    PULL = 4
+    PUSH = 5
+
+
 # Job()
 #
 # The Job object represents a parallel task, when calling Job.spawn(),
@@ -61,6 +69,7 @@ class Process(multiprocessing.Process):
 #
 # Args:
 #    scheduler (Scheduler): The scheduler
+#    job_type (QueueType): The type of the job
 #    action_name (str): The queue action name
 #    logfile (str): A template string that points to the logfile
 #                   that should be used - should contain {pid}.
@@ -68,13 +77,14 @@ class Process(multiprocessing.Process):
 #
 class Job():
 
-    def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
+    def __init__(self, scheduler, job_type, action_name, logfile, *, max_retries=0):
 
         #
         # Public members
         #
         self.action_name = action_name   # The action name for the Queue
-        self.child_data = None
+        self.child_data = None           # Data to be sent to the main process
+        self.job_type = job_type         # The type of the job
 
         #
         # Private members
@@ -541,6 +551,7 @@ class Job():
             return
 
         self._parent_complete(returncode == 0, self._result)
+        self._scheduler.job_completed(self)
 
     # _parent_process_envelope()
     #
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 50ba312..0c75538 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -18,7 +18,9 @@
 #        Tristan Van Berkom <tr...@codethink.co.uk>
 #        Jürg Billeter <ju...@codethink.co.uk>
 
+import os
 from . import Queue, QueueStatus, QueueType
+from ..jobs import JobType
 
 
 # A queue which assembles elements
@@ -28,6 +30,7 @@ class BuildQueue(Queue):
     action_name = "Build"
     complete_name = "Built"
     queue_type = QueueType.BUILD
+    job_type = JobType.BUILD
 
     def process(self, element):
         element._assemble()
diff --git a/buildstream/_scheduler/queues/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
index bdff156..2438a9a 100644
--- a/buildstream/_scheduler/queues/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -23,6 +23,7 @@ from ... import Consistency
 
 # Local imports
 from . import Queue, QueueStatus, QueueType
+from ..jobs import JobType
 
 
 # A queue which fetches element sources
@@ -32,6 +33,7 @@ class FetchQueue(Queue):
     action_name = "Fetch"
     complete_name = "Fetched"
     queue_type = QueueType.FETCH
+    job_type = JobType.FETCH
 
     def __init__(self, scheduler, skip_cached=False):
         super().__init__(scheduler)
diff --git a/buildstream/_scheduler/queues/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
index b4f5b0d..1fc4364 100644
--- a/buildstream/_scheduler/queues/pullqueue.py
+++ b/buildstream/_scheduler/queues/pullqueue.py
@@ -20,6 +20,7 @@
 
 # Local imports
 from . import Queue, QueueStatus, QueueType
+from ..jobs import JobType
 
 
 # A queue which pulls element artifacts
@@ -29,6 +30,7 @@ class PullQueue(Queue):
     action_name = "Pull"
     complete_name = "Pulled"
     queue_type = QueueType.FETCH
+    job_type = JobType.PULL
 
     def process(self, element):
         # returns whether an artifact was downloaded or not
diff --git a/buildstream/_scheduler/queues/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
index 624eefd..aa5540e 100644
--- a/buildstream/_scheduler/queues/pushqueue.py
+++ b/buildstream/_scheduler/queues/pushqueue.py
@@ -20,6 +20,7 @@
 
 # Local imports
 from . import Queue, QueueStatus, QueueType
+from ..jobs import JobType
 
 
 # A queue which pushes element artifacts
@@ -29,6 +30,7 @@ class PushQueue(Queue):
     action_name = "Push"
     complete_name = "Pushed"
     queue_type = QueueType.PUSH
+    job_type = JobType.PUSH
 
     def process(self, element):
         # returns whether an artifact was uploaded or not
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 7f115b4..39e431b 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -71,6 +71,7 @@ class Queue():
     action_name = None
     complete_name = None
     queue_type = None
+    job_type = None
 
     def __init__(self, scheduler):
 
@@ -241,16 +242,14 @@ class Queue():
             logfile = self._element_log_path(element)
             self.prepare(element)
 
-            job = ElementJob(scheduler, self.action_name,
-                             logfile, element=element,
+            job = ElementJob(scheduler, self.job_type,
+                             self.action_name, logfile,
+                             element=element, queue=self,
                              action_cb=self.process,
                              complete_cb=self._job_done,
                              max_retries=self._max_retries)
             scheduler.job_starting(job, element)
 
-            job.spawn()
-            self.active_jobs.append(job)
-
         # These were not ready but were in the beginning, give em
         # first priority again next time around
         self._wait_queue.extendleft(unready)
@@ -350,11 +349,6 @@ class Queue():
         # immediately before invoking another round of scheduling
         self._scheduler.put_job_token(self.queue_type)
 
-        # Notify frontend
-        self._scheduler.job_completed(self, job, element, success)
-
-        self._scheduler.sched()
-
     # Convenience wrapper for Queue implementations to send
     # a message for the element they are processing
     def _message(self, element, message_type, brief, **kwargs):
diff --git a/buildstream/_scheduler/queues/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
index 3a65f01..a371e52 100644
--- a/buildstream/_scheduler/queues/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -24,6 +24,7 @@ from ... import SourceError
 
 # Local imports
 from . import Queue, QueueStatus, QueueType
+from ..jobs import JobType
 
 
 # A queue which tracks sources
@@ -33,6 +34,7 @@ class TrackQueue(Queue):
     action_name = "Track"
     complete_name = "Tracked"
     queue_type = QueueType.FETCH
+    job_type = JobType.TRACK
 
     def process(self, element):
         return element._track()
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index a7a3f95..ffbd656 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -21,11 +21,13 @@
 # System imports
 import os
 import asyncio
+from itertools import chain
 import signal
 import datetime
 from contextlib import contextmanager
 
 # Local imports
+from .jobs import JobType
 from .queues import QueueType
 
 
@@ -36,6 +38,59 @@ class SchedStatus():
     TERMINATED = 1
 
 
+# A set of rules that dictates in which order jobs should run.
+#
+# The first tuple defines jobs that are not allowed to be executed
+# before the current job completes (even if the job is still waiting
+# to be executed).
+#
+# The second tuple defines jobs that the current job is not allowed to
+# be run in parallel with.
+#
+# Note that this is very different from the element job
+# dependencies. Both a build and fetch job can be ready at the same
+# time, this has nothing to do with the requirement to fetch sources
+# before building. These rules are purely in place to maintain cache
+# consistency.
+#
+JOB_RULES = {
+    JobType.CLEAN: {
+        # Build and pull jobs are not allowed to run when we are about
+        # to start a cleanup job, because they will add more data to
+        # the artifact cache.
+        'priority': (JobType.BUILD, JobType.PULL),
+        # Cleanup jobs are not allowed to run in parallel with any
+        # jobs that might need to access the artifact cache, because
+        # we cannot guarantee atomicity otherwise.
+        'exclusive': (JobType.BUILD, JobType.PULL, JobType.PUSH)
+    },
+    JobType.BUILD: {
+        'priority': (),
+        'exclusive': ()
+    },
+    JobType.FETCH: {
+        'priority': (),
+        'exclusive': ()
+    },
+    JobType.PULL: {
+        'priority': (),
+        'exclusive': ()
+    },
+    JobType.PUSH: {
+        'priority': (),
+        'exclusive': ()
+    },
+    JobType.SIZE: {
+        'priority': (),
+        'exclusive': ()
+    },
+    JobType.TRACK: {
+        'priority': (),
+        'exclusive': ()
+    }
+}
+
+
 # Scheduler()
 #
 # The scheduler operates on a list queues, each of which is meant to accomplish
@@ -69,6 +124,8 @@ class Scheduler():
         #
         # Public members
         #
+        self.waiting_jobs = []      # Jobs waiting for execution
+        self.active_jobs = []       # Jobs currently being run in the scheduler
         self.queues = None          # Exposed for the frontend to print summaries
         self.context = context      # The Context object shared with Queues
         self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
@@ -129,7 +186,7 @@ class Scheduler():
         self._connect_signals()
 
         # Run the queues
-        self.sched()
+        self.schedule_queue_jobs()
         self.loop.run_forever()
         self.loop.close()
 
@@ -220,11 +277,38 @@ class Scheduler():
     # and process anything that is ready.
     #
     def sched(self):
+        for job in self.waiting_jobs:
+            # If our job is not allowed to run with any job currently
+            # running, we don't start it.
+            if any(running_job.job_type in JOB_RULES[job.job_type]['exclusive']
+                   for running_job in self.active_jobs):
+                continue
+
+            # If any job currently waiting has priority over this one,
+            # we don't start it.
+            if any(job.job_type in JOB_RULES[waiting_job.job_type]['priority']
+                   for waiting_job in self.waiting_jobs):
+                continue
+
+            job.spawn()
+            self.waiting_jobs.remove(job)
+            self.active_jobs.append(job)
+
+            if self._job_start_callback:
+                self._job_start_callback(job)
+
+        # If nothings ticking, time to bail out
+        if not self.active_jobs and not self.waiting_jobs:
+            self.loop.stop()
+
+    def schedule_jobs(self, jobs):
+        self.waiting_jobs.extend(jobs)
 
+    def schedule_queue_jobs(self):
+        ready = []
         process_queues = True
 
         while self._queue_jobs and process_queues:
-
             # Pull elements forward through queues
             elements = []
             for queue in self.queues:
@@ -233,31 +317,42 @@ class Scheduler():
 
                 # Dequeue processed elements for the next queue
                 elements = list(queue.dequeue())
-                elements = list(elements)
 
             # Kickoff whatever processes can be processed at this time
             #
-            # We start by queuing from the last queue first, because we want to
-            # give priority to queues later in the scheduling process in the case
-            # that multiple queues share the same token type.
+            # We start by queuing from the last queue first, because
+            # we want to give priority to queues later in the
+            # scheduling process in the case that multiple queues
+            # share the same token type.
             #
-            # This avoids starvation situations where we dont move on to fetch
-            # tasks for elements which failed to pull, and thus need all the pulls
-            # to complete before ever starting a build
-            for queue in reversed(self.queues):
-                queue.process_ready()
-
-            # process_ready() may have skipped jobs, adding them to the done_queue.
-            # Pull these skipped elements forward to the next queue and process them.
+            # This avoids starvation situations where we dont move on
+            # to fetch tasks for elements which failed to pull, and
+            # thus need all the pulls to complete before ever starting
+            # a build
+            ready.extend(chain.from_iterable(
+                queue.process_ready() for queue in reversed(self.queues)
+            ))
+
+            # process_ready() may have skipped jobs, adding them to
+            # the done_queue.  Pull these skipped elements forward to
+            # the next queue and process them.
             process_queues = any(q.dequeue_ready() for q in self.queues)
 
-        # If nothings ticking, time to bail out
-        ticking = 0
-        for queue in self.queues:
-            ticking += len(queue.active_jobs)
+        self.schedule_jobs(ready)
+        self.sched()
 
-        if ticking == 0:
-            self.loop.stop()
+    # job_completed():
+    #
+    # Called when a Job completes
+    #
+    # Args:
+    #    queue (Queue): The Queue holding a complete job
+    #    job (Job): The completed Job
+    #    success (bool): Whether the Job completed with a success status
+    #
+    def job_completed(self, job):
+        self.active_jobs.remove(job)
+        self.schedule_queue_jobs()
 
     # get_job_token():
     #
@@ -290,30 +385,6 @@ class Scheduler():
     def put_job_token(self, queue_type):
         self._job_tokens[queue_type] += 1
 
-    # job_starting():
-    #
-    # Called by the Queue when starting a Job
-    #
-    # Args:
-    #    job (Job): The starting Job
-    #
-    def job_starting(self, job, element):
-        if self._job_start_callback:
-            self._job_start_callback(element, job.action_name)
-
-    # job_completed():
-    #
-    # Called by the Queue when a Job completes
-    #
-    # Args:
-    #    queue (Queue): The Queue holding a complete job
-    #    job (Job): The completed Job
-    #    success (bool): Whether the Job completed with a success status
-    #
-    def job_completed(self, queue, job, element, success):
-        if self._job_complete_callback:
-            self._job_complete_callback(element, queue, job.action_name, success)
-
     #######################################################
     #                  Local Private Methods              #
     #######################################################
@@ -400,18 +471,20 @@ class Scheduler():
         wait_start = datetime.datetime.now()
         wait_limit = 20.0
 
-        # First tell all jobs to terminate
+        active_jobs = self.active_jobs
         for queue in self.queues:
-            for job in queue.active_jobs:
-                job.terminate()
+            active_jobs.extend(queue.active_jobs)
+
+        # First tell all jobs to terminate
+        for job in active_jobs:
+            job.terminate()
 
         # Now wait for them to really terminate
-        for queue in self.queues:
-            for job in queue.active_jobs:
-                elapsed = datetime.datetime.now() - wait_start
-                timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
-                if not job.terminate_wait(timeout):
-                    job.kill()
+        for job in active_jobs:
+            elapsed = datetime.datetime.now() - wait_start
+            timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
+            if not job.terminate_wait(timeout):
+                job.kill()
 
         self.loop.stop()