You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ak...@apache.org on 2022/07/26 14:40:17 UTC

[buildstream] branch abderrahim/simplify-jobs created (now 15c57fd45)

This is an automated email from the ASF dual-hosted git repository.

akitouni pushed a change to branch abderrahim/simplify-jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git


      at 15c57fd45 job.py: replace the jobs module

This branch includes the following new commits:

     new a16905170 job.py: merge ChildJob into Job
     new c25fae052 job.py: merge ElementJob into Job
     new 15c57fd45 job.py: replace the jobs module

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 01/03: job.py: merge ChildJob into Job

Posted by ak...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

akitouni pushed a commit to branch abderrahim/simplify-jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit a1690517044a1f27e0e0d0eb6fb0b78750d129f4
Author: Abderrahim Kitouni <ab...@codethink.co.uk>
AuthorDate: Mon Jul 25 15:00:29 2022 +0200

    job.py: merge ChildJob into Job
    
    We no longer need the parent/child separation
---
 src/buildstream/_scheduler/jobs/elementjob.py |  12 +-
 src/buildstream/_scheduler/jobs/job.py        | 186 +++++---------------------
 2 files changed, 31 insertions(+), 167 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
index 3d04beba9..28bb65d80 100644
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ b/src/buildstream/_scheduler/jobs/elementjob.py
@@ -16,7 +16,7 @@
 #        Tristan Daniël Maat <tr...@codethink.co.uk>
 #
 
-from .job import Job, ChildJob
+from .job import Job
 
 
 # ElementJob()
@@ -76,16 +76,6 @@ class ElementJob(Job):
     def parent_complete(self, status, result):
         self._complete_cb(self, self._element, status, self._result)
 
-    def create_child_job(self, *args, **kwargs):
-        return ChildElementJob(*args, element=self._element, action_cb=self._action_cb, **kwargs)
-
-
-class ChildElementJob(ChildJob):
-    def __init__(self, *args, element, action_cb, **kwargs):
-        super().__init__(*args, **kwargs)
-        self._element = element
-        self._action_cb = action_cb
-
     def child_process(self):
 
         # Run the action
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 08bdbcd9d..c57f8d29f 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -67,23 +67,6 @@ class JobStatus(FastEnum):
 # process. It has some methods that are not implemented - they are meant for
 # you to implement in a subclass.
 #
-# It has a close relationship with the ChildJob class, and it can be considered
-# a two part solution:
-#
-# 1. A Job instance, which will create a ChildJob instance and arrange for
-#    childjob.child_process() to be executed in another process.
-# 2. The created ChildJob instance, which does the actual work.
-#
-# This split makes it clear what data is passed to the other process and what
-# is executed in which process.
-#
-# To set up a minimal new kind of Job, e.g. YourJob:
-#
-# 1. Create a YourJob class, inheriting from Job.
-# 2. Create a YourChildJob class, inheriting from ChildJob.
-# 3. Implement YourJob.create_child_job() and YourJob.parent_complete().
-# 4. Implement YourChildJob.child_process().
-#
 # Args:
 #    scheduler (Scheduler): The scheduler
 #    action_name (str): The queue action name
@@ -123,7 +106,10 @@ class Job:
         self._element = None  # The Element() passed to the Job() constructor, if applicable
 
         self._task = None  # The task that is run
-        self._child = None
+
+        self._thread_id = None  # Thread in which the child executes its action
+        self._should_terminate = False
+        self._terminate_lock = threading.Lock()
 
     # set_name()
     #
@@ -141,22 +127,10 @@ class Job:
 
         self._tries += 1
 
-        # FIXME: remove the parent/child separation, it's not needed anymore.
-        self._child = self.create_child_job(  # pylint: disable=assignment-from-no-return
-            self.action_name,
-            self._messenger,
-            self._scheduler.context.logdir,
-            self._logfile,
-            self._max_retries,
-            self._tries,
-            self._message_element_name,
-            self._message_element_key,
-        )
-
         loop = asyncio.get_event_loop()
 
         async def execute():
-            ret_code, self._result = await loop.run_in_executor(None, self._child.child_action)
+            ret_code, self._result = await loop.run_in_executor(None, self.child_action)
             await self._parent_child_completed(ret_code)
 
         self._task = loop.create_task(execute())
@@ -171,8 +145,17 @@ class Job:
         self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
         if self._task:
-            self._child.terminate()
+            assert utils._is_in_main_thread(), "Terminating the job's thread should only be done from the scheduler"
+
+            if self._should_terminate:
+                return
+
+            with self._terminate_lock:
+                self._should_terminate = True
+                if self._thread_id is None:
+                    return
 
+            terminate_thread(self._thread_id)
         self._terminated = True
 
     # get_terminated()
@@ -244,6 +227,20 @@ class Job:
     #                  Abstract Methods                   #
     #######################################################
 
+    # child_process()
+    #
+    # This will be executed after starting the child process, and is intended
+    # to perform the job's task.
+    #
+    # Returns:
+    #    (any): A simple object (must be pickle-able, i.e. strings, lists,
+    #           dicts, numbers, but not Element instances). It is returned to
+    #           the parent Job running in the main process. This is taken as
+    #           the result of the Job.
+    #
+    def child_process(self):
+        raise ImplError("Job '{kind}' does not implement child_process()".format(kind=type(self).__name__))
+
     # parent_complete()
     #
     # This will be executed in the main process after the job finishes, and is
@@ -256,24 +253,6 @@ class Job:
     def parent_complete(self, status, result):
         raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__))
 
-    # create_child_job()
-    #
-    # Called by a Job instance to create a child job.
-    #
-    # The child job object is an instance of a subclass of ChildJob.
-    #
-    # The child job object's child_process() method will be executed in another
-    # process, so that work is done in parallel. See the documentation for the
-    # Job class for more information on this relationship.
-    #
-    # This method must be overridden by Job subclasses.
-    #
-    # Returns:
-    #    (ChildJob): An instance of a subclass of ChildJob.
-    #
-    def create_child_job(self, *args, **kwargs):
-        raise ImplError("Job '{kind}' does not implement create_child_job()".format(kind=type(self).__name__))
-
     #######################################################
     #                  Local Private Methods              #
     #######################################################
@@ -326,92 +305,6 @@ class Job:
         self._scheduler.job_completed(self, status)
         self._task = None
 
-
-# ChildJob()
-#
-# The ChildJob object represents the part of a parallel task that will run in a
-# separate process. It has a close relationship with the parent Job that
-# created it.
-#
-# See the documentation of the Job class for more on their relationship, and
-# how to set up a (Job, ChildJob pair).
-#
-# The args below are passed from the parent Job to the ChildJob.
-#
-# Args:
-#    scheduler (Scheduler): The scheduler.
-#    action_name (str): The queue action name.
-#    logfile (str): A template string that points to the logfile
-#                   that should be used - should contain {pid}.
-#    max_retries (int): The maximum number of retries.
-#    tries (int): The number of retries so far.
-#    message_element_name (str): None, or the plugin instance element name
-#                                to be supplied to the Message() constructor.
-#    message_element_key (tuple): None, or the element display key tuple
-#                                to be supplied to the Message() constructor.
-#
-class ChildJob:
-    def __init__(
-        self, action_name, messenger, logdir, logfile, max_retries, tries, message_element_name, message_element_key
-    ):
-
-        self.action_name = action_name
-
-        self._messenger = messenger
-        self._logdir = logdir
-        self._logfile = logfile
-        self._max_retries = max_retries
-        self._tries = tries
-        self._message_element_name = message_element_name
-        self._message_element_key = message_element_key
-
-        self._thread_id = None  # Thread in which the child executes its action
-        self._should_terminate = False
-        self._terminate_lock = threading.Lock()
-
-    # message():
-    #
-    # Logs a message, this will be logged in the task's logfile and
-    # conditionally also be sent to the frontend.
-    #
-    # Args:
-    #    message_type (MessageType): The type of message to send
-    #    message (str): The message
-    #    kwargs: Remaining Message() constructor arguments, note
-    #            element_key is set in _child_message_handler
-    #            for front end display if not already set or explicitly
-    #            overriden here.
-    #
-    def message(self, message_type, message, **kwargs):
-        kwargs["scheduler"] = True
-        self._messenger.message(
-            Message(
-                message_type,
-                message,
-                element_name=self._message_element_name,
-                element_key=self._message_element_key,
-                **kwargs
-            )
-        )
-
-    #######################################################
-    #                  Abstract Methods                   #
-    #######################################################
-
-    # child_process()
-    #
-    # This will be executed after starting the child process, and is intended
-    # to perform the job's task.
-    #
-    # Returns:
-    #    (any): A simple object (must be pickle-able, i.e. strings, lists,
-    #           dicts, numbers, but not Element instances). It is returned to
-    #           the parent Job running in the main process. This is taken as
-    #           the result of the Job.
-    #
-    def child_process(self):
-        raise ImplError("ChildJob '{kind}' does not implement child_process()".format(kind=type(self).__name__))
-
     # child_action()
     #
     # Perform the action in the child process, this calls the action_cb.
@@ -426,7 +319,7 @@ class ChildJob:
         # Time, log and and run the action function
         #
         with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages(
-            self._logfile, self._logdir
+            self._logfile, self._scheduler.context.logdir
         ) as filename:
             try:
                 self.message(MessageType.START, self.action_name, logfile=filename)
@@ -499,22 +392,3 @@ class ChildJob:
             except TerminateException:
                 self._thread_id = None
                 return _ReturnCode.TERMINATED, None
-
-    # terminate()
-    #
-    # Ask the the current child thread to terminate
-    #
-    # This should only ever be called from the main thread.
-    #
-    def terminate(self):
-        assert utils._is_in_main_thread(), "Terminating the job's thread should only be done from the scheduler"
-
-        if self._should_terminate:
-            return
-
-        with self._terminate_lock:
-            self._should_terminate = True
-            if self._thread_id is None:
-                return
-
-        terminate_thread(self._thread_id)


[buildstream] 02/03: job.py: merge ElementJob into Job

Posted by ak...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

akitouni pushed a commit to branch abderrahim/simplify-jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit c25fae052d726afa0056e90fa3eb50bf9189c95e
Author: Abderrahim Kitouni <ab...@codethink.co.uk>
AuthorDate: Tue Jul 26 14:25:48 2022 +0200

    job.py: merge ElementJob into Job
    
    ElementJob was the only subclass, since the other jobs are now taken care of
    by buildbox-casd
---
 src/buildstream/_scheduler/__init__.py        |   2 +-
 src/buildstream/_scheduler/jobs/__init__.py   |   3 +-
 src/buildstream/_scheduler/jobs/elementjob.py |  82 ------------------
 src/buildstream/_scheduler/jobs/job.py        | 118 ++++++++++----------------
 src/buildstream/_scheduler/queues/queue.py    |   5 +-
 5 files changed, 47 insertions(+), 163 deletions(-)

diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d014c1788..c6dbe3642 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -27,4 +27,4 @@ from .queues.pullqueue import PullQueue
 from .queues.cachequeryqueue import CacheQueryQueue
 
 from .scheduler import Scheduler, SchedStatus
-from .jobs import ElementJob, JobStatus
+from .jobs import Job, JobStatus
diff --git a/src/buildstream/_scheduler/jobs/__init__.py b/src/buildstream/_scheduler/jobs/__init__.py
index 3de09a475..68dedc8b0 100644
--- a/src/buildstream/_scheduler/jobs/__init__.py
+++ b/src/buildstream/_scheduler/jobs/__init__.py
@@ -16,5 +16,4 @@
 #  Authors:
 #        Tristan Maat <tr...@codethink.co.uk>
 
-from .elementjob import ElementJob
-from .job import JobStatus
+from .job import Job, JobStatus
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
deleted file mode 100644
index 28bb65d80..000000000
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ /dev/null
@@ -1,82 +0,0 @@
-#  Copyright (C) 2018 Codethink Limited
-#
-#  Licensed under the Apache License, Version 2.0 (the "License");
-#  you may not use this file except in compliance with the License.
-#  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-#  Author:
-#        Tristan Daniël Maat <tr...@codethink.co.uk>
-#
-
-from .job import Job
-
-
-# ElementJob()
-#
-# A job to run an element's commands. When this job is started
-# `action_cb` will be called, and when it completes `complete_cb` will
-# be called.
-#
-# Args:
-#    scheduler (Scheduler): The scheduler
-#    action_name (str): The queue action name
-#    max_retries (int): The maximum number of retries
-#    action_cb (callable): The function to execute on the child
-#    complete_cb (callable): The function to execute when the job completes
-#    element (Element): The element to work on
-#    kwargs: Remaining Job() constructor arguments
-#
-# Here is the calling signature of the action_cb:
-#
-#     action_cb():
-#
-#     This function will be called in the child task
-#
-#     Args:
-#        element (Element): The element passed to the Job() constructor
-#
-#     Returns:
-#        (object): Any abstract simple python object, including a string, int,
-#                  bool, list or dict, this must be a simple serializable object.
-#
-# Here is the calling signature of the complete_cb:
-#
-#     complete_cb():
-#
-#     This function will be called when the child task completes
-#
-#     Args:
-#        job (Job): The job object which completed
-#        element (Element): The element passed to the Job() constructor
-#        status (JobStatus): The status of whether the workload raised an exception
-#        result (object): The deserialized object returned by the `action_cb`, or None
-#                         if `success` is False
-#
-class ElementJob(Job):
-    def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs):
-        super().__init__(*args, **kwargs)
-        self.set_name(element._get_full_name())
-        self.queue = queue
-        self._element = element  # Set the Element pertaining to the job
-        self._action_cb = action_cb  # The action callable function
-        self._complete_cb = complete_cb  # The complete callable function
-
-        # Set the plugin element name & key for logging purposes
-        self.set_message_element_name(self.name)
-        self.set_message_element_key(self._element._get_display_key())
-
-    def parent_complete(self, status, result):
-        self._complete_cb(self, self._element, status, self._result)
-
-    def child_process(self):
-
-        # Run the action
-        return self._action_cb(self._element)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index c57f8d29f..90fa1765c 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -72,21 +72,52 @@ class JobStatus(FastEnum):
 #    action_name (str): The queue action name
 #    logfile (str): A template string that points to the logfile
 #                   that should be used - should contain {pid}.
+#    element (Element): The element to work on
+#    action_cb (callable): The function to execute on the child
+#    complete_cb (callable): The function to execute when the job completes
 #    max_retries (int): The maximum number of retries
 #
+# Here is the calling signature of the action_cb:
+#
+#     action_cb():
+#
+#     This function will be called in the child task
+#
+#     Args:
+#        element (Element): The element passed to the Job() constructor
+#
+#     Returns:
+#        (object): Any abstract simple python object, including a string, int,
+#                  bool, list or dict, this must be a simple serializable object.
+#
+# Here is the calling signature of the complete_cb:
+#
+#     complete_cb():
+#
+#     This function will be called when the child task completes
+#
+#     Args:
+#        job (Job): The job object which completed
+#        element (Element): The element passed to the Job() constructor
+#        status (JobStatus): The status of whether the workload raised an exception
+#        result (object): The deserialized object returned by the `action_cb`, or None
+#                         if `success` is False
+#
+
+
 class Job:
     # Unique id generator for jobs
     #
     # This is used to identify tasks in the `State` class
     _id_generator = itertools.count(1)
 
-    def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
+    def __init__(self, scheduler, action_name, logfile, element, action_cb, complete_cb, max_retries=0):
 
         #
         # Public members
         #
         self.id = "{}-{}".format(action_name, next(Job._id_generator))
-        self.name = None  # The name of the job, set by the job's subclass
+        self.name = element._get_full_name()
         self.action_name = action_name  # The action name for the Queue
 
         #
@@ -101,9 +132,11 @@ class Job:
         self._terminated = False  # Whether this job has been explicitly terminated
 
         self._logfile = logfile
-        self._message_element_name = None  # The task-wide element name
-        self._message_element_key = None  # The task-wide element cache key
-        self._element = None  # The Element() passed to the Job() constructor, if applicable
+        self._element_name = element._get_full_name()  # The task-wide element name
+        self._element_key = element._get_display_key()  # The task-wide element cache key
+        self._element = element  # Set the Element pertaining to the job
+        self._action_cb = action_cb  # The action callable function
+        self._complete_cb = complete_cb  # The complete callable function
 
         self._task = None  # The task that is run
 
@@ -111,12 +144,6 @@ class Job:
         self._should_terminate = False
         self._terminate_lock = threading.Lock()
 
-    # set_name()
-    #
-    # Sets the name of this job
-    def set_name(self, name):
-        self.name = name
-
     # start()
     #
     # Starts the job.
@@ -168,28 +195,6 @@ class Job:
     def get_terminated(self):
         return self._terminated
 
-    # set_message_element_name()
-    #
-    # This is called by Job subclasses to set the plugin instance element
-    # name issuing the message (if an element is related to the Job).
-    #
-    # Args:
-    #     element_name (int): The element_name to be supplied to the Message() constructor
-    #
-    def set_message_element_name(self, element_name):
-        self._message_element_name = element_name
-
-    # set_message_element_key()
-    #
-    # This is called by Job subclasses to set the element
-    # key for for the issuing message (if an element is related to the Job).
-    #
-    # Args:
-    #     element_key (_DisplayKey): The element_key tuple to be supplied to the Message() constructor
-    #
-    def set_message_element_key(self, element_key):
-        self._message_element_key = element_key
-
     # message():
     #
     # Logs a message, this will be logged in the task's logfile and
@@ -204,18 +209,13 @@ class Job:
     def message(self, message_type, message, **kwargs):
         kwargs["scheduler"] = True
         message = Message(
-            message_type,
-            message,
-            element_name=self._message_element_name,
-            element_key=self._message_element_key,
-            **kwargs
+            message_type, message, element_name=self._element_name, element_key=self._element_key, **kwargs
         )
         self._messenger.message(message)
 
     # get_element()
     #
-    # Get the Element() related to the job, if jobtype (i.e ElementJob) is
-    # applicable, default None.
+    # Get the Element() related to the job
     #
     # Returns:
     #     (Element): The Element() instance pertaining to the Job, else None.
@@ -223,36 +223,6 @@ class Job:
     def get_element(self):
         return self._element
 
-    #######################################################
-    #                  Abstract Methods                   #
-    #######################################################
-
-    # child_process()
-    #
-    # This will be executed after starting the child process, and is intended
-    # to perform the job's task.
-    #
-    # Returns:
-    #    (any): A simple object (must be pickle-able, i.e. strings, lists,
-    #           dicts, numbers, but not Element instances). It is returned to
-    #           the parent Job running in the main process. This is taken as
-    #           the result of the Job.
-    #
-    def child_process(self):
-        raise ImplError("Job '{kind}' does not implement child_process()".format(kind=type(self).__name__))
-
-    # parent_complete()
-    #
-    # This will be executed in the main process after the job finishes, and is
-    # expected to pass the result to the main thread.
-    #
-    # Args:
-    #    status (JobStatus): The job exit status
-    #    result (any): The result returned by child_process().
-    #
-    def parent_complete(self, status, result):
-        raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__))
-
     #######################################################
     #                  Local Private Methods              #
     #######################################################
@@ -301,7 +271,7 @@ class Job:
         else:
             status = JobStatus.FAIL
 
-        self.parent_complete(status, self._result)
+        self._complete_cb(self, self._element, status, self._result)
         self._scheduler.job_completed(self, status)
         self._task = None
 
@@ -312,9 +282,7 @@ class Job:
     def child_action(self):
         # Set the global message handler in this child
         # process to forward messages to the parent process
-        self._messenger.setup_new_action_context(
-            self.action_name, self._message_element_name, self._message_element_key
-        )
+        self._messenger.setup_new_action_context(self.action_name, self._element_name, self._element_key)
 
         # Time, log and and run the action function
         #
@@ -331,7 +299,7 @@ class Job:
 
                 try:
                     # Try the task action
-                    result = self.child_process()  # pylint: disable=assignment-from-no-return
+                    result = self._action_cb(self._element)
                 except SkipJob as e:
                     elapsed = datetime.datetime.now() - timeinfo.start_time
                     self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 76688e75d..110f14944 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -25,7 +25,7 @@ import traceback
 from typing import TYPE_CHECKING
 
 # Local imports
-from ..jobs import ElementJob, JobStatus
+from ..jobs import Job, JobStatus
 from ..resources import ResourceType
 
 # BuildStream toplevel imports
@@ -232,12 +232,11 @@ class Queue:
             ready.append(element)
 
         return [
-            ElementJob(
+            Job(
                 self._scheduler,
                 self.action_name,
                 self._element_log_path(element),
                 element=element,
-                queue=self,
                 action_cb=self.get_process_func(),
                 complete_cb=self._job_done,
                 max_retries=self._max_retries,


[buildstream] 03/03: job.py: replace the jobs module

Posted by ak...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

akitouni pushed a commit to branch abderrahim/simplify-jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 15c57fd45516c15334d31f50de0ea783796f95aa
Author: Abderrahim Kitouni <ab...@codethink.co.uk>
AuthorDate: Tue Jul 26 16:27:05 2022 +0200

    job.py: replace the jobs module
    
    This is the only package remaining in the module
---
 src/buildstream/_scheduler/__init__.py               |  2 +-
 src/buildstream/_scheduler/{jobs => }/job.py         | 12 ++++++------
 src/buildstream/_scheduler/jobs/__init__.py          | 19 -------------------
 src/buildstream/_scheduler/queues/buildqueue.py      |  2 +-
 src/buildstream/_scheduler/queues/cachequeryqueue.py |  2 +-
 src/buildstream/_scheduler/queues/fetchqueue.py      |  2 +-
 src/buildstream/_scheduler/queues/pullqueue.py       |  2 +-
 src/buildstream/_scheduler/queues/queue.py           |  2 +-
 src/buildstream/_scheduler/queues/trackqueue.py      |  2 +-
 src/buildstream/_scheduler/scheduler.py              |  2 +-
 10 files changed, 14 insertions(+), 33 deletions(-)

diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index c6dbe3642..19568b795 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -27,4 +27,4 @@ from .queues.pullqueue import PullQueue
 from .queues.cachequeryqueue import CacheQueryQueue
 
 from .scheduler import Scheduler, SchedStatus
-from .jobs import Job, JobStatus
+from .job import Job, JobStatus
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/job.py
similarity index 98%
rename from src/buildstream/_scheduler/jobs/job.py
rename to src/buildstream/_scheduler/job.py
index 90fa1765c..c81b2e73d 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/job.py
@@ -27,12 +27,12 @@ import threading
 import traceback
 
 # BuildStream toplevel imports
-from ... import utils
-from ..._utils import terminate_thread
-from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType
-from ...types import FastEnum
-from ..._signals import TerminateException
+from .. import utils
+from .._utils import terminate_thread
+from .._exceptions import ImplError, BstError, set_last_task_error, SkipJob
+from .._message import Message, MessageType
+from ..types import FastEnum
+from .._signals import TerminateException
 
 
 # Return code values shutdown of job handling child processes
diff --git a/src/buildstream/_scheduler/jobs/__init__.py b/src/buildstream/_scheduler/jobs/__init__.py
deleted file mode 100644
index 68dedc8b0..000000000
--- a/src/buildstream/_scheduler/jobs/__init__.py
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-#  Copyright (C) 2018 Codethink Limited
-#
-#  Licensed under the Apache License, Version 2.0 (the "License");
-#  you may not use this file except in compliance with the License.
-#  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-#  Authors:
-#        Tristan Maat <tr...@codethink.co.uk>
-
-from .job import Job, JobStatus
diff --git a/src/buildstream/_scheduler/queues/buildqueue.py b/src/buildstream/_scheduler/queues/buildqueue.py
index 5a6ca75bc..e014864ce 100644
--- a/src/buildstream/_scheduler/queues/buildqueue.py
+++ b/src/buildstream/_scheduler/queues/buildqueue.py
@@ -19,7 +19,7 @@
 
 from . import Queue, QueueStatus
 from ..resources import ResourceType
-from ..jobs import JobStatus
+from ..job import JobStatus
 
 
 # A queue which assembles elements
diff --git a/src/buildstream/_scheduler/queues/cachequeryqueue.py b/src/buildstream/_scheduler/queues/cachequeryqueue.py
index b650a91b8..5071cbd81 100644
--- a/src/buildstream/_scheduler/queues/cachequeryqueue.py
+++ b/src/buildstream/_scheduler/queues/cachequeryqueue.py
@@ -16,7 +16,7 @@
 
 from . import Queue, QueueStatus
 from ..resources import ResourceType
-from ..jobs import JobStatus
+from ..job import JobStatus
 from ...types import _KeyStrength
 
 
diff --git a/src/buildstream/_scheduler/queues/fetchqueue.py b/src/buildstream/_scheduler/queues/fetchqueue.py
index b6c915802..29bc6069d 100644
--- a/src/buildstream/_scheduler/queues/fetchqueue.py
+++ b/src/buildstream/_scheduler/queues/fetchqueue.py
@@ -20,7 +20,7 @@
 # Local imports
 from . import Queue, QueueStatus
 from ..resources import ResourceType
-from ..jobs import JobStatus
+from ..job import JobStatus
 
 
 # A queue which fetches element sources
diff --git a/src/buildstream/_scheduler/queues/pullqueue.py b/src/buildstream/_scheduler/queues/pullqueue.py
index 1ab095399..0b192ff4d 100644
--- a/src/buildstream/_scheduler/queues/pullqueue.py
+++ b/src/buildstream/_scheduler/queues/pullqueue.py
@@ -20,7 +20,7 @@
 # Local imports
 from . import Queue, QueueStatus
 from ..resources import ResourceType
-from ..jobs import JobStatus
+from ..job import JobStatus
 from ..._exceptions import SkipJob
 
 
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 110f14944..034a781d6 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -25,7 +25,7 @@ import traceback
 from typing import TYPE_CHECKING
 
 # Local imports
-from ..jobs import Job, JobStatus
+from ..job import Job, JobStatus
 from ..resources import ResourceType
 
 # BuildStream toplevel imports
diff --git a/src/buildstream/_scheduler/queues/trackqueue.py b/src/buildstream/_scheduler/queues/trackqueue.py
index 26e22eb10..f9ca0d65b 100644
--- a/src/buildstream/_scheduler/queues/trackqueue.py
+++ b/src/buildstream/_scheduler/queues/trackqueue.py
@@ -23,7 +23,7 @@ from ...plugin import Plugin
 # Local imports
 from . import Queue, QueueStatus
 from ..resources import ResourceType
-from ..jobs import JobStatus
+from ..job import JobStatus
 
 
 # A queue which tracks sources
diff --git a/src/buildstream/_scheduler/scheduler.py b/src/buildstream/_scheduler/scheduler.py
index 9d8e1c0f3..c8a78eb62 100644
--- a/src/buildstream/_scheduler/scheduler.py
+++ b/src/buildstream/_scheduler/scheduler.py
@@ -31,7 +31,7 @@ from concurrent.futures import ThreadPoolExecutor
 
 # Local imports
 from .resources import Resources
-from .jobs import JobStatus
+from .job import JobStatus
 from ..types import FastEnum
 from .._profile import Topics, PROFILER
 from ..plugin import Plugin