You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ak...@apache.org on 2022/07/26 14:40:19 UTC
[buildstream] 02/03: job.py: merge ElementJob into Job
This is an automated email from the ASF dual-hosted git repository.
akitouni pushed a commit to branch abderrahim/simplify-jobs
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit c25fae052d726afa0056e90fa3eb50bf9189c95e
Author: Abderrahim Kitouni <ab...@codethink.co.uk>
AuthorDate: Tue Jul 26 14:25:48 2022 +0200
job.py: merge ElementJob into Job
ElementJob was the only subclass, since the other jobs are now taken care of
by buildbox-casd
---
src/buildstream/_scheduler/__init__.py | 2 +-
src/buildstream/_scheduler/jobs/__init__.py | 3 +-
src/buildstream/_scheduler/jobs/elementjob.py | 82 ------------------
src/buildstream/_scheduler/jobs/job.py | 118 ++++++++++----------------
src/buildstream/_scheduler/queues/queue.py | 5 +-
5 files changed, 47 insertions(+), 163 deletions(-)
diff --git a/src/buildstream/_scheduler/__init__.py b/src/buildstream/_scheduler/__init__.py
index d014c1788..c6dbe3642 100644
--- a/src/buildstream/_scheduler/__init__.py
+++ b/src/buildstream/_scheduler/__init__.py
@@ -27,4 +27,4 @@ from .queues.pullqueue import PullQueue
from .queues.cachequeryqueue import CacheQueryQueue
from .scheduler import Scheduler, SchedStatus
-from .jobs import ElementJob, JobStatus
+from .jobs import Job, JobStatus
diff --git a/src/buildstream/_scheduler/jobs/__init__.py b/src/buildstream/_scheduler/jobs/__init__.py
index 3de09a475..68dedc8b0 100644
--- a/src/buildstream/_scheduler/jobs/__init__.py
+++ b/src/buildstream/_scheduler/jobs/__init__.py
@@ -16,5 +16,4 @@
# Authors:
# Tristan Maat <tr...@codethink.co.uk>
-from .elementjob import ElementJob
-from .job import JobStatus
+from .job import Job, JobStatus
diff --git a/src/buildstream/_scheduler/jobs/elementjob.py b/src/buildstream/_scheduler/jobs/elementjob.py
deleted file mode 100644
index 28bb65d80..000000000
--- a/src/buildstream/_scheduler/jobs/elementjob.py
+++ /dev/null
@@ -1,82 +0,0 @@
-# Copyright (C) 2018 Codethink Limited
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Author:
-# Tristan Daniƫl Maat <tr...@codethink.co.uk>
-#
-
-from .job import Job
-
-
-# ElementJob()
-#
-# A job to run an element's commands. When this job is started
-# `action_cb` will be called, and when it completes `complete_cb` will
-# be called.
-#
-# Args:
-# scheduler (Scheduler): The scheduler
-# action_name (str): The queue action name
-# max_retries (int): The maximum number of retries
-# action_cb (callable): The function to execute on the child
-# complete_cb (callable): The function to execute when the job completes
-# element (Element): The element to work on
-# kwargs: Remaining Job() constructor arguments
-#
-# Here is the calling signature of the action_cb:
-#
-# action_cb():
-#
-# This function will be called in the child task
-#
-# Args:
-# element (Element): The element passed to the Job() constructor
-#
-# Returns:
-# (object): Any abstract simple python object, including a string, int,
-# bool, list or dict, this must be a simple serializable object.
-#
-# Here is the calling signature of the complete_cb:
-#
-# complete_cb():
-#
-# This function will be called when the child task completes
-#
-# Args:
-# job (Job): The job object which completed
-# element (Element): The element passed to the Job() constructor
-# status (JobStatus): The status of whether the workload raised an exception
-# result (object): The deserialized object returned by the `action_cb`, or None
-# if `success` is False
-#
-class ElementJob(Job):
- def __init__(self, *args, element, queue, action_cb, complete_cb, **kwargs):
- super().__init__(*args, **kwargs)
- self.set_name(element._get_full_name())
- self.queue = queue
- self._element = element # Set the Element pertaining to the job
- self._action_cb = action_cb # The action callable function
- self._complete_cb = complete_cb # The complete callable function
-
- # Set the plugin element name & key for logging purposes
- self.set_message_element_name(self.name)
- self.set_message_element_key(self._element._get_display_key())
-
- def parent_complete(self, status, result):
- self._complete_cb(self, self._element, status, self._result)
-
- def child_process(self):
-
- # Run the action
- return self._action_cb(self._element)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index c57f8d29f..90fa1765c 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -72,21 +72,52 @@ class JobStatus(FastEnum):
# action_name (str): The queue action name
# logfile (str): A template string that points to the logfile
# that should be used - should contain {pid}.
+# element (Element): The element to work on
+# action_cb (callable): The function to execute on the child
+# complete_cb (callable): The function to execute when the job completes
# max_retries (int): The maximum number of retries
#
+# Here is the calling signature of the action_cb:
+#
+# action_cb():
+#
+# This function will be called in the child task
+#
+# Args:
+# element (Element): The element passed to the Job() constructor
+#
+# Returns:
+# (object): Any abstract simple python object, including a string, int,
+# bool, list or dict, this must be a simple serializable object.
+#
+# Here is the calling signature of the complete_cb:
+#
+# complete_cb():
+#
+# This function will be called when the child task completes
+#
+# Args:
+# job (Job): The job object which completed
+# element (Element): The element passed to the Job() constructor
+# status (JobStatus): The status of whether the workload raised an exception
+# result (object): The deserialized object returned by the `action_cb`, or None
+# if `success` is False
+#
+
+
class Job:
# Unique id generator for jobs
#
# This is used to identify tasks in the `State` class
_id_generator = itertools.count(1)
- def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
+ def __init__(self, scheduler, action_name, logfile, element, action_cb, complete_cb, max_retries=0):
#
# Public members
#
self.id = "{}-{}".format(action_name, next(Job._id_generator))
- self.name = None # The name of the job, set by the job's subclass
+ self.name = element._get_full_name()
self.action_name = action_name # The action name for the Queue
#
@@ -101,9 +132,11 @@ class Job:
self._terminated = False # Whether this job has been explicitly terminated
self._logfile = logfile
- self._message_element_name = None # The task-wide element name
- self._message_element_key = None # The task-wide element cache key
- self._element = None # The Element() passed to the Job() constructor, if applicable
+ self._element_name = element._get_full_name() # The task-wide element name
+ self._element_key = element._get_display_key() # The task-wide element cache key
+ self._element = element # Set the Element pertaining to the job
+ self._action_cb = action_cb # The action callable function
+ self._complete_cb = complete_cb # The complete callable function
self._task = None # The task that is run
@@ -111,12 +144,6 @@ class Job:
self._should_terminate = False
self._terminate_lock = threading.Lock()
- # set_name()
- #
- # Sets the name of this job
- def set_name(self, name):
- self.name = name
-
# start()
#
# Starts the job.
@@ -168,28 +195,6 @@ class Job:
def get_terminated(self):
return self._terminated
- # set_message_element_name()
- #
- # This is called by Job subclasses to set the plugin instance element
- # name issuing the message (if an element is related to the Job).
- #
- # Args:
- # element_name (int): The element_name to be supplied to the Message() constructor
- #
- def set_message_element_name(self, element_name):
- self._message_element_name = element_name
-
- # set_message_element_key()
- #
- # This is called by Job subclasses to set the element
- # key for for the issuing message (if an element is related to the Job).
- #
- # Args:
- # element_key (_DisplayKey): The element_key tuple to be supplied to the Message() constructor
- #
- def set_message_element_key(self, element_key):
- self._message_element_key = element_key
-
# message():
#
# Logs a message, this will be logged in the task's logfile and
@@ -204,18 +209,13 @@ class Job:
def message(self, message_type, message, **kwargs):
kwargs["scheduler"] = True
message = Message(
- message_type,
- message,
- element_name=self._message_element_name,
- element_key=self._message_element_key,
- **kwargs
+ message_type, message, element_name=self._element_name, element_key=self._element_key, **kwargs
)
self._messenger.message(message)
# get_element()
#
- # Get the Element() related to the job, if jobtype (i.e ElementJob) is
- # applicable, default None.
+ # Get the Element() related to the job
#
# Returns:
# (Element): The Element() instance pertaining to the Job, else None.
@@ -223,36 +223,6 @@ class Job:
def get_element(self):
return self._element
- #######################################################
- # Abstract Methods #
- #######################################################
-
- # child_process()
- #
- # This will be executed after starting the child process, and is intended
- # to perform the job's task.
- #
- # Returns:
- # (any): A simple object (must be pickle-able, i.e. strings, lists,
- # dicts, numbers, but not Element instances). It is returned to
- # the parent Job running in the main process. This is taken as
- # the result of the Job.
- #
- def child_process(self):
- raise ImplError("Job '{kind}' does not implement child_process()".format(kind=type(self).__name__))
-
- # parent_complete()
- #
- # This will be executed in the main process after the job finishes, and is
- # expected to pass the result to the main thread.
- #
- # Args:
- # status (JobStatus): The job exit status
- # result (any): The result returned by child_process().
- #
- def parent_complete(self, status, result):
- raise ImplError("Job '{kind}' does not implement parent_complete()".format(kind=type(self).__name__))
-
#######################################################
# Local Private Methods #
#######################################################
@@ -301,7 +271,7 @@ class Job:
else:
status = JobStatus.FAIL
- self.parent_complete(status, self._result)
+ self._complete_cb(self, self._element, status, self._result)
self._scheduler.job_completed(self, status)
self._task = None
@@ -312,9 +282,7 @@ class Job:
def child_action(self):
# Set the global message handler in this child
# process to forward messages to the parent process
- self._messenger.setup_new_action_context(
- self.action_name, self._message_element_name, self._message_element_key
- )
+ self._messenger.setup_new_action_context(self.action_name, self._element_name, self._element_key)
# Time, log and and run the action function
#
@@ -331,7 +299,7 @@ class Job:
try:
# Try the task action
- result = self.child_process() # pylint: disable=assignment-from-no-return
+ result = self._action_cb(self._element)
except SkipJob as e:
elapsed = datetime.datetime.now() - timeinfo.start_time
self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
diff --git a/src/buildstream/_scheduler/queues/queue.py b/src/buildstream/_scheduler/queues/queue.py
index 76688e75d..110f14944 100644
--- a/src/buildstream/_scheduler/queues/queue.py
+++ b/src/buildstream/_scheduler/queues/queue.py
@@ -25,7 +25,7 @@ import traceback
from typing import TYPE_CHECKING
# Local imports
-from ..jobs import ElementJob, JobStatus
+from ..jobs import Job, JobStatus
from ..resources import ResourceType
# BuildStream toplevel imports
@@ -232,12 +232,11 @@ class Queue:
ready.append(element)
return [
- ElementJob(
+ Job(
self._scheduler,
self.action_name,
self._element_log_path(element),
element=element,
- queue=self,
action_cb=self.get_process_func(),
complete_cb=self._job_done,
max_retries=self._max_retries,