You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ak...@apache.org on 2022/07/26 14:40:19 UTC

[buildstream] 02/03: job.py: merge ElementJob into Job

This is an automated email from the ASF dual-hosted git repository.

akitouni pushed a commit to branch abderrahim/simplify-jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit c25fae052d726afa0056e90fa3eb50bf9189c95e
Author: Abderrahim Kitouni <ab...@codethink.co.uk>
AuthorDate: Tue Jul 26 14:25:48 2022 +0200

    job.py: merge ElementJob into Job
    
    ElementJob was the only subclass, since the other jobs are now taken care of
    by buildbox-casd
---
 src/buildstream/_scheduler/__init__.py        |   2 +-
 src/buildstream/_scheduler/jobs/__init__.py   |   3 +-
 src/buildstream/_scheduler/jobs/elementjob.py |  82 ------------------
 src/buildstream/_scheduler/jobs/job.py        | 118 ++++++++++----------------
 src/buildstream/_scheduler/queues/queue.py    |   5 +-
 5 files changed, 47 insertions(+), 163 deletions(-)

diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d014c1788..c6dbe3642 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -27,4 +27,4 @@ from .queues.pullqueue import PullQueue
 from .queues.cachequeryqueue import CacheQueryQueue
 
 from .scheduler import Scheduler, SchedStatus
-from .jobs import ElementJob, JobStatus
+from .jobs import Job, JobStatus
diff --git a/src/buildstream/_scheduler/jobs/__init__.py b/src/buildstream/_scheduler/jobs/__init__.py
index 3de09a475..68dedc8b0 100644
--- a/src/buildstream/_scheduler/jobs/__init__.py
+++ b/src/buildstream/_scheduler/jobs/__init__.py
@@ -16,5 +16,4 @@
 #  Authors:
 #        Tristan Maat <tr...@codethink.co.uk>
 
-from .elementjob import ElementJob
-from .job import JobStatus
+from .job import Job, JobStatus
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
deleted file mode 100644
index 28bb65d80..000000000
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ /dev/null
@@ -1,82 +0,0 @@
-#  Copyright (C) 2018 Codethink Limited
-#
-#  Licensed under the Apache License, Version 2.0 (the "License");
-#  you may not use this file except in compliance with the License.
-#  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-#
-#  Author:
-#        Tristan Daniƫl Maat <tr...@codethink.co.uk>
-#
-
-from .job import Job
-
-
-# ElementJob()
-#
-# A job to run an element's commands. When this job is started
-# `action_cb` will be called, and when it completes `complete_cb` will
-# be called.
-#
-# Args:
-#    scheduler (Scheduler): The scheduler
-#    action_name (str): The queue action name
-#    max_retries (int): The maximum number of retries
-#    action_cb (callable): The function to execute on the child
-#    complete_cb (callable): The function to execute when the job completes
-#    element (Element): The element to work on
-#    kwargs: Remaining Job() constructor arguments
-#
-# Here is the calling signature of the action_cb:
-#
-#     action_cb():
-#
-#     This function will be called in the child task
-#
-#     Args:
-#        element (Element): The element passed to the Job() constructor
-#
-#     Returns:
-#        (object): Any abstract simple python object, including a string, int,
-#                  bool, list or dict, this must be a simple serializable object.
-#
-# Here is the calling signature of the complete_cb:
-#
-#     complete_cb():
-#
-#     This function will be called when the child task completes
-#
-#     Args:
-#        job (Job): The job object which completed
-#        element (Element): The element passed to the Job() constructor
-#        status (JobStatus): The status of whether the workload raised an exception
-#        result (object): The deserialized object returned by the `action_cb`, or None
-#                         if `success` is False
-#
-class ElementJob(Job):
-    def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs):
-        super().__init__(*args, **kwargs)
-        self.set_name(element._get_full_name())
-        self.queue = queue
-        self._element = element  # Set the Element pertaining to the job
-        self._action_cb = action_cb  # The action callable function
-        self._complete_cb = complete_cb  # The complete callable function
-
-        # Set the plugin element name & key for logging purposes
-        self.set_message_element_name(self.name)
-        self.set_message_element_key(self._element._get_display_key())
-
-    def parent_complete(self, status, result):
-        self._complete_cb(self, self._element, status, self._result)
-
-    def child_process(self):
-
-        # Run the action
-        return self._action_cb(self._element)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index c57f8d29f..90fa1765c 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -72,21 +72,52 @@ class JobStatus(FastEnum):
 #    action_name (str): The queue action name
 #    logfile (str): A template string that points to the logfile
 #                   that should be used - should contain {pid}.
+#    element (Element): The element to work on
+#    action_cb (callable): The function to execute on the child
+#    complete_cb (callable): The function to execute when the job completes
 #    max_retries (int): The maximum number of retries
 #
+# Here is the calling signature of the action_cb:
+#
+#     action_cb():
+#
+#     This function will be called in the child task
+#
+#     Args:
+#        element (Element): The element passed to the Job() constructor
+#
+#     Returns:
+#        (object): Any abstract simple python object, including a string, int,
+#                  bool, list or dict, this must be a simple serializable object.
+#
+# Here is the calling signature of the complete_cb:
+#
+#     complete_cb():
+#
+#     This function will be called when the child task completes
+#
+#     Args:
+#        job (Job): The job object which completed
+#        element (Element): The element passed to the Job() constructor
+#        status (JobStatus): The status of whether the workload raised an exception
+#        result (object): The deserialized object returned by the `action_cb`, or None
+#                         if `success` is False
+#
+
+
 class Job:
     # Unique id generator for jobs
     #
     # This is used to identify tasks in the `State` class
     _id_generator = itertools.count(1)
 
-    def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
+    def __init__(self, scheduler, action_name, logfile, element, action_cb, complete_cb, max_retries=0):
 
         #
         # Public members
         #
         self.id = "{}-{}".format(action_name, next(Job._id_generator))
-        self.name = None  # The name of the job, set by the job's subclass
+        self.name = element._get_full_name()
         self.action_name = action_name  # The action name for the Queue
 
         #
@@ -101,9 +132,11 @@ class Job:
         self._terminated = False  # Whether this job has been explicitly terminated
 
         self._logfile = logfile
-        self._message_element_name = None  # The task-wide element name
-        self._message_element_key = None  # The task-wide element cache key
-        self._element = None  # The Element() passed to the Job() constructor, if applicable
+        self._element_name = element._get_full_name()  # The task-wide element name
+        self._element_key = element._get_display_key()  # The task-wide element cache key
+        self._element = element  # Set the Element pertaining to the job
+        self._action_cb = action_cb  # The action callable function
+        self._complete_cb = complete_cb  # The complete callable function
 
         self._task = None  # The task that is run
 
@@ -111,12 +144,6 @@ class Job:
         self._should_terminate = False
         self._terminate_lock = threading.Lock()
 
-    # set_name()
-    #
-    # Sets the name of this job
-    def set_name(self, name):
-        self.name = name
-
     # start()
     #
     # Starts the job.
@@ -168,28 +195,6 @@ class Job:
     def get_terminated(self):
         return self._terminated
 
-    # set_message_element_name()
-    #
-    # This is called by Job subclasses to set the plugin instance element
-    # name issuing the message (if an element is related to the Job).
-    #
-    # Args:
-    #     element_name (int): The element_name to be supplied to the Message() constructor
-    #
-    def set_message_element_name(self, element_name):
-        self._message_element_name = element_name
-
-    # set_message_element_key()
-    #
-    # This is called by Job subclasses to set the element
-    # key for for the issuing message (if an element is related to the Job).
-    #
-    # Args:
-    #     element_key (_DisplayKey): The element_key tuple to be supplied to the Message() constructor
-    #
-    def set_message_element_key(self, element_key):
-        self._message_element_key = element_key
-
     # message():
     #
     # Logs a message, this will be logged in the task's logfile and
@@ -204,18 +209,13 @@ class Job:
     def message(self, message_type, message, **kwargs):
         kwargs["scheduler"] = True
         message = Message(
-            message_type,
-            message,
-            element_name=self._message_element_name,
-            element_key=self._message_element_key,
-            **kwargs
+            message_type, message, element_name=self._element_name, element_key=self._element_key, **kwargs
         )
         self._messenger.message(message)
 
     # get_element()
     #
-    # Get the Element() related to the job, if jobtype (i.e ElementJob) is
-    # applicable, default None.
+    # Get the Element() related to the job
     #
     # Returns:
     #     (Element): The Element() instance pertaining to the Job, else None.
@@ -223,36 +223,6 @@ class Job:
     def get_element(self):
         return self._element
 
-    #######################################################
-    #                  Abstract Methods                   #
-    #######################################################
-
-    # child_process()
-    #
-    # This will be executed after starting the child process, and is intended
-    # to perform the job's task.
-    #
-    # Returns:
-    #    (any): A simple object (must be pickle-able, i.e. strings, lists,
-    #           dicts, numbers, but not Element instances). It is returned to
-    #           the parent Job running in the main process. This is taken as
-    #           the result of the Job.
-    #
-    def child_process(self):
-        raise ImplError("Job '{kind}' does not implement child_process()".format(kind=type(self).__name__))
-
-    # parent_complete()
-    #
-    # This will be executed in the main process after the job finishes, and is
-    # expected to pass the result to the main thread.
-    #
-    # Args:
-    #    status (JobStatus): The job exit status
-    #    result (any): The result returned by child_process().
-    #
-    def parent_complete(self, status, result):
-        raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__))
-
     #######################################################
     #                  Local Private Methods              #
     #######################################################
@@ -301,7 +271,7 @@ class Job:
         else:
             status = JobStatus.FAIL
 
-        self.parent_complete(status, self._result)
+        self._complete_cb(self, self._element, status, self._result)
         self._scheduler.job_completed(self, status)
         self._task = None
 
@@ -312,9 +282,7 @@ class Job:
     def child_action(self):
         # Set the global message handler in this child
         # process to forward messages to the parent process
-        self._messenger.setup_new_action_context(
-            self.action_name, self._message_element_name, self._message_element_key
-        )
+        self._messenger.setup_new_action_context(self.action_name, self._element_name, self._element_key)
 
         # Time, log and and run the action function
         #
@@ -331,7 +299,7 @@ class Job:
 
                 try:
                     # Try the task action
-                    result = self.child_process()  # pylint: disable=assignment-from-no-return
+                    result = self._action_cb(self._element)
                 except SkipJob as e:
                     elapsed = datetime.datetime.now() - timeinfo.start_time
                     self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 76688e75d..110f14944 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -25,7 +25,7 @@ import traceback
 from typing import TYPE_CHECKING
 
 # Local imports
-from ..jobs import ElementJob, JobStatus
+from ..jobs import Job, JobStatus
 from ..resources import ResourceType
 
 # BuildStream toplevel imports
@@ -232,12 +232,11 @@ class Queue:
             ready.append(element)
 
         return [
-            ElementJob(
+            Job(
                 self._scheduler,
                 self.action_name,
                 self._element_log_path(element),
                 element=element,
-                queue=self,
                 action_cb=self.get_process_func(),
                 complete_cb=self._job_done,
                 max_retries=self._max_retries,