You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:56:11 UTC
[buildstream] 02/04: buildstream/_scheduler/*: Make Jobs abstract
and element-independent
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch 372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit ff7a00fdd755a9932f1f1aae937c944ef7f360f1
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Thu Apr 19 17:11:54 2018 +0100
buildstream/_scheduler/*: Make Jobs abstract and element-independent
---
buildstream/_scheduler/jobs/__init__.py | 2 +
buildstream/_scheduler/jobs/elementjob.py | 210 +++++++++++++++++++
buildstream/_scheduler/{ => jobs}/job.py | 324 ++++++++++++++++--------------
buildstream/_scheduler/queues/queue.py | 37 +++-
buildstream/_scheduler/scheduler.py | 8 +-
5 files changed, 417 insertions(+), 164 deletions(-)
diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
new file mode 100644
index 0000000..f215c39
--- /dev/null
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -0,0 +1,2 @@
+from .job import Job
+from .elementjob import ElementJob
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
new file mode 100644
index 0000000..379e2d1
--- /dev/null
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -0,0 +1,210 @@
+# Copyright (C) 2018 Codethink Limited
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+# Author:
+# Tristan Daniël Maat <tr...@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from ruamel import yaml
+
+from ..._message import Message, MessageType
+from ...plugin import _plugin_lookup
+from ... import _signals
+
+from .job import Job
+
+
+# ElementJob()
+#
+# A job to run an element's commands. When this job is spawned
+# `action_cb` will be called, and when it completes `complete_cb` will
+# be called.
+#
+# Args:
+# scheduler (Scheduler): The scheduler
+# action_name (str): The queue action name
+# max_retries (int): The maximum number of retries
+# action_cb (callable): The function to execute on the child
+# complete_cb (callable): The function to execute when the job completes
+# element (Element): The element to work on
+#
+# Here is the calling signature of the action_cb:
+#
+# action_cb():
+#
+# This function will be called in the child task
+#
+# Args:
+# element (Element): The element passed to the Job() constructor
+#
+# Returns:
+# (object): Any abstract simple python object, including a string, int,
+# bool, list or dict, this must be a simple serializable object.
+#
+# Here is the calling signature of the complete_cb:
+#
+# complete_cb():
+#
+# This function will be called when the child task completes
+#
+# Args:
+# job (Job): The job object which completed
+# element (Element): The element passed to the Job() constructor
+# success (bool): True if the action_cb did not raise an exception
+# result (object): The deserialized object returned by the `action_cb`, or None
+# if `success` is False
+#
+class ElementJob(Job):
+ def __init__(self, *args, element, action_cb, complete_cb, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._element = element
+ self._action_cb = action_cb # The action callable function
+ self._complete_cb = complete_cb # The complete callable function
+
+ # _child_process()
+ #
+ # This will be executed after fork(), and is intended to perform
+ # the job's task.
+ #
+ # Returns:
+ # (any): A (simple!) object to be returned to the main thread
+ # as the result.
+ #
+ def _child_process(self):
+ return self._action_cb(self._element)
+
+ def _parent_complete(self, success, result):
+ self._complete_cb(self, self._element, success, self._result)
+
+ # _child_logging_enabled()
+ #
+ # Start the log for this job. This function will be given a
+ # template string for the path to a log file - this will contain
+ # "{pid}", which should be replaced with the current process'
+ # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
+ #
+ # Args:
+ # logfile (str): A template string that points to the logfile
+ # that should be used - replace {pid} first.
+ #
+ # Yields:
+ # (str) The path to the logfile with {pid} replaced.
+ #
+ @contextmanager
+ def _child_logging_enabled(self, logfile):
+ self._logfile = logfile.format(pid=os.getpid())
+
+ with open(self._logfile, 'a') as log:
+ # Write one last line to the log and flush it to disk
+ def flush_log():
+
+ # If the process currently had something happening in the I/O stack
+ # then trying to reenter the I/O stack will fire a runtime error.
+ #
+ # So just try to flush as well as we can at SIGTERM time
+ try:
+ # FIXME: Better logging
+
+ log.write('\n\nAction {} for element {} forcefully terminated\n'
+ .format(self.action_name, self._element.name))
+ log.flush()
+ except RuntimeError:
+ os.fsync(log.fileno())
+
+ self._element._set_log_handle(log)
+ with _signals.terminator(flush_log):
+ self._print_start_message(self._element, self._logfile)
+ yield self._logfile
+ self._element._set_log_handle(None)
+ self._logfile = None
+
+ # _message():
+ #
+ # Sends a message to the frontend
+ #
+ # Args:
+ # message_type (MessageType): The type of message to send
+ # message (str): The message
+ # kwargs: Remaining Message() constructor arguments
+ #
+ def _message(self, message_type, message, **kwargs):
+ args = dict(kwargs)
+ args['scheduler'] = True
+ self._scheduler.context.message(
+ Message(self._element._get_unique_id(),
+ message_type,
+ message,
+ **args))
+
+ def _print_start_message(self, element, logfile):
+ self._message(MessageType.START, self.action_name, logfile=logfile)
+
+ # Print the element's environment at the beginning of any element's log file.
+ #
+ # This should probably be omitted for non-build tasks but it's harmless here
+ elt_env = element.get_environment()
+ env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
+ self._message(MessageType.LOG,
+ "Build environment for element {}".format(element.name),
+ detail=env_dump, logfile=logfile)
+
+ # _child_log()
+ #
+ # Log a message returned by the frontend's main message handler
+ # and return it to the main process.
+ #
+ # Arguments:
+ # message (str): The message to log
+ #
+ # Returns:
+ # message (Message): A message object
+ #
+ def _child_log(self, message):
+ # Tag them on the way out the door...
+ message.action_name = self.action_name
+ message.task_id = self._element._get_unique_id()
+
+ # Use the plugin for the task for the output, not a plugin
+ # which might be acting on behalf of the task
+ plugin = _plugin_lookup(message.task_id)
+
+ with plugin._output_file() as output:
+ message_text = self._format_frontend_message(message, '[{}]'.format(plugin.name))
+ output.write('{}\n'.format(message_text))
+ output.flush()
+
+ return message
+
+ # _child_process_data()
+ #
+ # Abstract method to retrieve additional data that should be
+ # returned to the parent process. Note that the job result is
+ # retrieved independently.
+ #
+ # Values can later be retrieved in Job.child_data.
+ #
+ # Returns:
+ # (dict) A dict containing values later to be read by _process_sync_data
+ #
+ def _child_process_data(self):
+ data = {}
+
+ workspace = self._element._get_workspace()
+ if workspace is not None:
+ data['workspace'] = workspace.to_dict()
+
+ return data
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/jobs/job.py
similarity index 69%
rename from buildstream/_scheduler/job.py
rename to buildstream/_scheduler/jobs/job.py
index b8b81f2..6728ae0 100644
--- a/buildstream/_scheduler/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -27,20 +27,21 @@ import datetime
import traceback
import asyncio
import multiprocessing
-from ruamel import yaml
+from contextlib import contextmanager
+
+import psutil
# BuildStream toplevel imports
-from .._exceptions import BstError, set_last_task_error
-from .._message import Message, MessageType, unconditional_messages
-from ..plugin import _plugin_lookup
-from .. import _signals, utils
+from ..._exceptions import ImplError, BstError, set_last_task_error
+from ..._message import MessageType, unconditional_messages
+from ... import _signals, utils
# Used to distinguish between status messages and return values
class Envelope():
def __init__(self, message_type, message):
- self.message_type = message_type
- self.message = message
+ self._message_type = message_type
+ self._message = message
# Process class that doesn't call waitpid on its own.
@@ -55,54 +56,24 @@ class Process(multiprocessing.Process):
# Job()
#
# The Job object represents a parallel task, when calling Job.spawn(),
-# the given `action_cb` will be called in parallel to the calling process,
-# and `complete_cb` will be called with the action result in the calling
-# process when the job completes.
+# the given `Job._child_process` will be called in parallel to the
+# calling process, and `Job._parent_complete` will be called with the
+# action result in the calling process when the job completes.
#
# Args:
# scheduler (Scheduler): The scheduler
-# element (Element): The element to operate on
# action_name (str): The queue action name
-# action_cb (callable): The action function
-# complete_cb (callable): The function to call when complete
# max_retries (int): The maximum number of retries
#
-# Here is the calling signature of the action_cb:
-#
-# action_cb():
-#
-# This function will be called in the child task
-#
-# Args:
-# element (Element): The element passed to the Job() constructor
-#
-# Returns:
-# (object): Any abstract simple python object, including a string, int,
-# bool, list or dict, this must be a simple serializable object.
-#
-# Here is the calling signature of the complete_cb:
-#
-# complete_cb():
-#
-# This function will be called when the child task completes
-#
-# Args:
-# job (Job): The job object which completed
-# element (Element): The element passed to the Job() constructor
-# success (bool): True if the action_cb did not raise an exception
-# result (object): The deserialized object returned by the `action_cb`, or None
-# if `success` is False
-#
class Job():
- def __init__(self, scheduler, element, action_name, action_cb, complete_cb, *, max_retries=0):
+ def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
#
# Public members
#
- self.element = element # The element we're processing
self.action_name = action_name # The action name for the Queue
- self.workspace_dict = None # A serialized Workspace object, after any modifications
+ self.child_data = None
#
# Private members
@@ -111,13 +82,12 @@ class Job():
self._queue = multiprocessing.Queue() # A message passing queue
self._process = None # The Process object
self._watcher = None # Child process watcher
- self._action_cb = action_cb # The action callable function
- self._complete_cb = complete_cb # The complete callable function
self._listening = False # Whether the parent is currently listening
self._suspended = False # Whether this job is currently suspended
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
self._tries = 0 # Try count, for retryable jobs
+ self._logfile = logfile
# spawn()
#
@@ -153,8 +123,7 @@ class Job():
# First resume the job if it's suspended
self.resume(silent=True)
- self._message(self.element, MessageType.STATUS,
- "{} terminating".format(self.action_name))
+ self._message(MessageType.STATUS, "{} terminating".format(self.action_name))
# Make sure there is no garbage on the queue
self._parent_stop_listening()
@@ -185,9 +154,15 @@ class Job():
def kill(self):
# Force kill
- self._message(self.element, MessageType.WARN,
+ self._message(MessageType.WARN,
"{} did not terminate gracefully, killing".format(self.action_name))
- utils._kill_process_tree(self._process.pid)
+
+ try:
+ utils._kill_process_tree(self._process.pid)
+ # This can happen if the process died of its own accord before
+ # we try to kill it
+ except psutil.NoSuchProcess:
+ return
# suspend()
#
@@ -195,7 +170,7 @@ class Job():
#
def suspend(self):
if not self._suspended:
- self._message(self.element, MessageType.STATUS,
+ self._message(MessageType.STATUS,
"{} suspending".format(self.action_name))
try:
@@ -220,42 +195,152 @@ class Job():
def resume(self, silent=False):
if self._suspended:
if not silent:
- self._message(self.element, MessageType.STATUS,
+ self._message(MessageType.STATUS,
"{} resuming".format(self.action_name))
os.kill(self._process.pid, signal.SIGCONT)
self._suspended = False
#######################################################
- # Local Private Methods #
+ # Abstract Methods #
#######################################################
+ # _parent_complete()
#
- # Methods prefixed with the word 'child' take place in the child process
+ # This will be executed after the job finishes, and is expected to
+ # pass the result to the main thread.
#
- # Methods prefixed with the word 'parent' take place in the parent process
+ # Args:
+ # success (bool): Whether the job was successful.
+ # result (any): The result returned by _child_process().
#
- # Other methods can be called in both child or parent processes
+ def _parent_complete(self, success, result):
+ raise ImplError("Job '{kind}' does not implement _parent_complete()"
+ .format(kind=type(self).__name__))
+
+ # _child_process()
#
- #######################################################
+ # This will be executed after fork(), and is intended to perform
+ # the job's task.
+ #
+ # Returns:
+ # (any): A (simple!) object to be returned to the main thread
+ # as the result.
+ #
+ def _child_process(self):
+ raise ImplError("Job '{kind}' does not implement _child_process()"
+ .format(kind=type(self).__name__))
+
+ # _child_logging_enabled()
+ #
+ # Start the log for this job. This function will be given a
+ # template string for the path to a log file - this will contain
+ # "{pid}", which should be replaced with the current process'
+ # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
+ #
+ # Args:
+ # logfile (str): A template string that points to the logfile
+ # that should be used - replace {pid} first.
+ #
+ # Yields:
+ # (str) The path to the logfile with {pid} replaced.
+ #
+ @contextmanager
+ def _child_logging_enabled(self, logfile):
+ raise ImplError("Job '{kind}' does not implement _child_logging_enabled()"
+ .format(kind=type(self).__name__))
# _message():
#
# Sends a message to the frontend
#
# Args:
- # plugin (Plugin): The plugin to send a message for
# message_type (MessageType): The type of message to send
# message (str): The message
# kwargs: Remaining Message() constructor arguments
#
- def _message(self, plugin, message_type, message, **kwargs):
- args = dict(kwargs)
- args['scheduler'] = True
- self._scheduler.context.message(
- Message(plugin._get_unique_id(),
- message_type,
- message,
- **args))
+ def _message(self, message_type, message, **kwargs):
+ raise ImplError("Job '{kind}' does not implement _message()"
+ .format(kind=type(self).__name__))
+
+ # _child_process_data()
+ #
+ # Abstract method to retrieve additional data that should be
+ # returned to the parent process. Note that the job result is
+ # retrieved independently.
+ #
+ # Values can later be retrieved in Job.child_data.
+ #
+ # Returns:
+ # (dict) A dict containing values later to be read by _process_sync_data
+ #
+ def _child_process_data(self):
+ return {}
+
+ # _child_log()
+ #
+ # Log a message returned by the frontend's main message handler
+ # and return it to the main process.
+ #
+ # This method is also expected to add process-specific information
+ # to the message (notably, action_name and task_id).
+ #
+ # Arguments:
+ # message (str): The message to log
+ #
+ # Returns:
+ # message (Message): A message object
+ #
+ def _child_log(self, message):
+ raise ImplError("Job '{kind}' does not implement _child_log()"
+ .format(kind=type(self).__name__))
+
+ #######################################################
+ # Local Private Methods #
+ #######################################################
+ #
+ # Methods prefixed with the word 'child' take place in the child process
+ #
+ # Methods prefixed with the word 'parent' take place in the parent process
+ #
+ # Other methods can be called in both child or parent processes
+ #
+ #######################################################
+
+ # _format_frontend_message()
+ #
+ # Format a message from the frontend for logging purposes. This
+ # will prepend a time code and add other information to help
+ # determine what happened.
+ #
+ # Args:
+ # message (Message) - The message to create a text from.
+ # name (str) - A name for the executing context.
+ #
+ # Returns:
+ # (str) The text to log.
+ #
+ def _format_frontend_message(self, message, name):
+ INDENT = " "
+ EMPTYTIME = "--:--:--"
+ template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
+
+ detail = ''
+ if message.detail is not None:
+ template += "\n\n{detail}"
+ detail = message.detail.rstrip('\n')
+ detail = INDENT + INDENT.join(detail.splitlines(True))
+
+ timecode = EMPTYTIME
+ if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+ hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+ minutes, seconds = divmod(remainder, 60)
+ timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+ return template.format(timecode=timecode,
+ type=message.message_type.upper(),
+ name=name,
+ message=message.message,
+ detail=detail)
# _child_action()
#
@@ -266,7 +351,7 @@ class Job():
#
def _child_action(self, queue):
- element = self.element
+ logfile = self._logfile
# This avoids some SIGTSTP signals from grandchildren
# getting propagated up to the master process
@@ -302,35 +387,24 @@ class Job():
# Time, log and and run the action function
#
with _signals.suspendable(stop_time, resume_time), \
- element._logging_enabled(self.action_name) as filename:
-
- self._message(element, MessageType.START, self.action_name, logfile=filename)
-
- # Print the element's environment at the beginning of any element's log file.
- #
- # This should probably be omitted for non-build tasks but it's harmless here
- elt_env = element.get_environment()
- env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
- self._message(element, MessageType.LOG,
- "Build environment for element {}".format(element.name),
- detail=env_dump, logfile=filename)
+ self._child_logging_enabled(logfile) as filename:
try:
# Try the task action
- result = self._action_cb(element)
+ result = self._child_process()
except BstError as e:
elapsed = datetime.datetime.now() - starttime
if self._tries <= self._max_retries:
- self._message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self._tries),
+ self._message(MessageType.FAIL,
+ "Try #{} failed, retrying".format(self._tries),
elapsed=elapsed)
else:
- self._message(element, MessageType.FAIL, str(e),
+ self._message(MessageType.FAIL, str(e),
elapsed=elapsed, detail=e.detail,
logfile=filename, sandbox=e.sandbox)
- # Report changes in the workspace, even if there was a handled failure
- self._child_send_workspace()
+ self._queue.put(Envelope('child_data', self._child_process_data()))
# Report the exception to the parent (for internal testing purposes)
self._child_send_error(e)
@@ -344,18 +418,19 @@ class Job():
#
elapsed = datetime.datetime.now() - starttime
detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
- self._message(element, MessageType.BUG, self.action_name,
+
+ self._message(MessageType.BUG, self.action_name,
elapsed=elapsed, detail=detail,
logfile=filename)
self._child_shutdown(1)
else:
# No exception occurred in the action
- self._child_send_workspace()
+ self._queue.put(Envelope('child_data', self._child_process_data()))
self._child_send_result(result)
elapsed = datetime.datetime.now() - starttime
- self._message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed,
+ self._message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
logfile=filename)
# Shutdown needs to stay outside of the above context manager,
@@ -399,16 +474,6 @@ class Job():
envelope = Envelope('result', result)
self._queue.put(envelope)
- # _child_send_workspace()
- #
- # Sends the serialized workspace through the message queue, if any
- #
- def _child_send_workspace(self):
- workspace = self.element._get_workspace()
- if workspace:
- envelope = Envelope('workspace', workspace.to_dict())
- self._queue.put(envelope)
-
# _child_shutdown()
#
# Shuts down the child process by cleaning up and exiting the process
@@ -420,44 +485,6 @@ class Job():
self._queue.close()
sys.exit(exit_code)
- # _child_log()
- #
- # Logs a Message to the process's dedicated log file
- #
- # Args:
- # plugin (Plugin): The plugin to log for
- # message (Message): The message to log
- #
- def _child_log(self, plugin, message):
-
- with plugin._output_file() as output:
- INDENT = " "
- EMPTYTIME = "--:--:--"
-
- name = '[' + plugin.name + ']'
-
- fmt = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
- detail = ''
- if message.detail is not None:
- fmt += "\n\n{detail}"
- detail = message.detail.rstrip('\n')
- detail = INDENT + INDENT.join(detail.splitlines(True))
-
- timecode = EMPTYTIME
- if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
- hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 * 60)
- minutes, seconds = divmod(remainder, 60)
- timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
- message_text = fmt.format(timecode=timecode,
- type=message.message_type.upper(),
- name=name,
- message=message.message,
- detail=detail)
-
- output.write('{}\n'.format(message_text))
- output.flush()
-
# _child_message_handler()
#
# A Context delegate for handling messages, this replaces the
@@ -471,16 +498,8 @@ class Job():
#
def _child_message_handler(self, message, context):
- # Tag them on the way out the door...
- message.action_name = self.action_name
- message.task_id = self.element._get_unique_id()
-
- # Use the plugin for the task for the output, not a plugin
- # which might be acting on behalf of the task
- plugin = _plugin_lookup(message.task_id)
-
# Log first
- self._child_log(plugin, message)
+ message = self._child_log(message)
if message.message_type == MessageType.FAIL and self._tries <= self._max_retries:
# Job will be retried, display failures as warnings in the frontend
@@ -520,7 +539,7 @@ class Job():
self.spawn()
return
- self._complete_cb(self, self.element, returncode == 0, self._result)
+ self._parent_complete(returncode == 0, self._result)
# _parent_process_envelope()
#
@@ -537,21 +556,22 @@ class Job():
if not self._listening:
return
- if envelope.message_type == 'message':
+ if envelope._message_type == 'message':
# Propagate received messages from children
# back through the context.
- self._scheduler.context.message(envelope.message)
- elif envelope.message_type == 'error':
+ self._scheduler.context.message(envelope._message)
+ elif envelope._message_type == 'error':
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
- set_last_task_error(envelope.message['domain'],
- envelope.message['reason'])
- elif envelope.message_type == 'result':
+ set_last_task_error(envelope._message['domain'],
+ envelope._message['reason'])
+ elif envelope._message_type == 'result':
assert self._result is None
- self._result = envelope.message
- elif envelope.message_type == 'workspace':
- self.workspace_dict = envelope.message
+ self._result = envelope._message
+ elif envelope._message_type == 'child_data':
+ assert self.child_data is None
+ self.child_data = envelope._message
else:
raise Exception()
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index e733776..648bebb 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -20,12 +20,13 @@
# Jürg Billeter <ju...@codethink.co.uk>
# System imports
+import os
from collections import deque
from enum import Enum
import traceback
# Local imports
-from ..job import Job
+from ..jobs import ElementJob
# BuildStream toplevel imports
from ..._exceptions import BstError, set_last_task_error
@@ -238,12 +239,15 @@ class Queue():
self.skipped_elements.append(element)
continue
+ logfile = self._element_log_path(element)
self.prepare(element)
- job = Job(scheduler, element, self.action_name,
- self.process, self._job_done,
- max_retries=self._max_retries)
- scheduler.job_starting(job)
+ job = ElementJob(scheduler, self.action_name,
+ logfile, element=element,
+ action_cb=self.process,
+ complete_cb=self._job_done,
+ max_retries=self._max_retries)
+ scheduler.job_starting(job, element)
job.spawn()
self.active_jobs.append(job)
@@ -266,11 +270,15 @@ class Queue():
# job (Job): The job which completed
#
def _update_workspaces(self, element, job):
+ workspace_dict = None
+ if job.child_data:
+ workspace_dict = job.child_data['workspace']
+
# Handle any workspace modifications now
#
- if job.workspace_dict:
+ if workspace_dict:
project = element._get_project()
- if project.workspaces.update_workspace(element.name, job.workspace_dict):
+ if project.workspaces.update_workspace(element.name, workspace_dict):
try:
project.workspaces.save_config()
except BstError as e:
@@ -343,7 +351,7 @@ class Queue():
self._scheduler.put_job_token(self.queue_type)
# Notify frontend
- self._scheduler.job_completed(self, job, success)
+ self._scheduler.job_completed(self, job, element, success)
self._scheduler.sched()
@@ -353,3 +361,16 @@ class Queue():
context = element._get_context()
message = Message(element._get_unique_id(), message_type, brief, **kwargs)
context.message(message)
+
+ def _element_log_path(self, element):
+ project = element._get_project()
+ context = element._get_context()
+
+ key = element._get_display_key()[1]
+ action = self.action_name.lower()
+ logfile = "{key}-{action}.{{pid}}.log".format(key=key, action=action)
+
+ directory = os.path.join(context.logdir, project.name, element.normal_name)
+
+ os.makedirs(directory, exist_ok=True)
+ return os.path.join(directory, logfile)
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 9cbfd49..a475722 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -298,9 +298,9 @@ class Scheduler():
# Args:
# job (Job): The starting Job
#
- def job_starting(self, job):
+ def job_starting(self, job, element):
if self._job_start_callback:
- self._job_start_callback(job.element, job.action_name)
+ self._job_start_callback(element, job.action_name)
# job_completed():
#
@@ -311,9 +311,9 @@ class Scheduler():
# job (Job): The completed Job
# success (bool): Whether the Job completed with a success status
#
- def job_completed(self, queue, job, success):
+ def job_completed(self, queue, job, element, success):
if self._job_complete_callback:
- self._job_complete_callback(job.element, queue, job.action_name, success)
+ self._job_complete_callback(element, queue, job.action_name, success)
#######################################################
# Local Private Methods #