You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:56:11 UTC

[buildstream] 02/04: buildstream/_scheduler/*: Make Jobs abstract and element-independent

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch 372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit ff7a00fdd755a9932f1f1aae937c944ef7f360f1
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Thu Apr 19 17:11:54 2018 +0100

    buildstream/_scheduler/*: Make Jobs abstract and element-independent
---
 buildstream/_scheduler/jobs/__init__.py   |   2 +
 buildstream/_scheduler/jobs/elementjob.py | 210 +++++++++++++++++++
 buildstream/_scheduler/{ => jobs}/job.py  | 324 ++++++++++++++++--------------
 buildstream/_scheduler/queues/queue.py    |  37 +++-
 buildstream/_scheduler/scheduler.py       |   8 +-
 5 files changed, 417 insertions(+), 164 deletions(-)

diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
new file mode 100644
index 0000000..f215c39
--- /dev/null
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -0,0 +1,2 @@
+from .job import Job
+from .elementjob import ElementJob
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
new file mode 100644
index 0000000..379e2d1
--- /dev/null
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -0,0 +1,210 @@
+#  Copyright (C) 2018 Codethink Limited
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Author:
+#        Tristan Daniël Maat <tr...@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from ruamel import yaml
+
+from ..._message import Message, MessageType
+from ...plugin import _plugin_lookup
+from ... import _signals
+
+from .job import Job
+
+
+# ElementJob()
+#
+# A job to run an element's commands. When this job is spawned
+# `action_cb` will be called, and when it completes `complete_cb` will
+# be called.
+#
+# Args:
+#    scheduler (Scheduler): The scheduler
+#    action_name (str): The queue action name
+#    max_retries (int): The maximum number of retries
+#    action_cb (callable): The function to execute on the child
+#    complete_cb (callable): The function to execute when the job completes
+#    element (Element): The element to work on
+#
+# Here is the calling signature of the action_cb:
+#
+#     action_cb():
+#
+#     This function will be called in the child task
+#
+#     Args:
+#        element (Element): The element passed to the Job() constructor
+#
+#     Returns:
+#        (object): Any abstract simple python object, including a string, int,
+#                  bool, list or dict, this must be a simple serializable object.
+#
+# Here is the calling signature of the complete_cb:
+#
+#     complete_cb():
+#
+#     This function will be called when the child task completes
+#
+#     Args:
+#        job (Job): The job object which completed
+#        element (Element): The element passed to the Job() constructor
+#        success (bool): True if the action_cb did not raise an exception
+#        result (object): The deserialized object returned by the `action_cb`, or None
+#                         if `success` is False
+#
+class ElementJob(Job):
+    def __init__(self, *args, element, action_cb, complete_cb, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._element = element
+        self._action_cb = action_cb            # The action callable function
+        self._complete_cb = complete_cb        # The complete callable function
+
+    # _child_process()
+    #
+    # This will be executed after fork(), and is intended to perform
+    # the job's task.
+    #
+    # Returns:
+    #    (any): A (simple!) object to be returned to the main thread
+    #           as the result.
+    #
+    def _child_process(self):
+        return self._action_cb(self._element)
+
+    def _parent_complete(self, success, result):
+        self._complete_cb(self, self._element, success, self._result)
+
+    # _child_logging_enabled()
+    #
+    # Start the log for this job. This function will be given a
+    # template string for the path to a log file - this will contain
+    # "{pid}", which should be replaced with the current process'
+    # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
+    #
+    # Args:
+    #    logfile (str): A template string that points to the logfile
+    #                   that should be used - replace {pid} first.
+    #
+    # Yields:
+    #    (str) The path to the logfile with {pid} replaced.
+    #
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        self._logfile = logfile.format(pid=os.getpid())
+
+        with open(self._logfile, 'a') as log:
+            # Write one last line to the log and flush it to disk
+            def flush_log():
+
+                # If the process currently had something happening in the I/O stack
+                # then trying to reenter the I/O stack will fire a runtime error.
+                #
+                # So just try to flush as well as we can at SIGTERM time
+                try:
+                    # FIXME: Better logging
+
+                    log.write('\n\nAction {} for element {} forcefully terminated\n'
+                              .format(self.action_name, self._element.name))
+                    log.flush()
+                except RuntimeError:
+                    os.fsync(log.fileno())
+
+            self._element._set_log_handle(log)
+            with _signals.terminator(flush_log):
+                self._print_start_message(self._element, self._logfile)
+                yield self._logfile
+            self._element._set_log_handle(None)
+            self._logfile = None
+
+    # _message():
+    #
+    # Sends a message to the frontend
+    #
+    # Args:
+    #    message_type (MessageType): The type of message to send
+    #    message (str): The message
+    #    kwargs: Remaining Message() constructor arguments
+    #
+    def _message(self, message_type, message, **kwargs):
+        args = dict(kwargs)
+        args['scheduler'] = True
+        self._scheduler.context.message(
+            Message(self._element._get_unique_id(),
+                    message_type,
+                    message,
+                    **args))
+
+    def _print_start_message(self, element, logfile):
+        self._message(MessageType.START, self.action_name, logfile=logfile)
+
+        # Print the element's environment at the beginning of any element's log file.
+        #
+        # This should probably be omitted for non-build tasks but it's harmless here
+        elt_env = element.get_environment()
+        env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
+        self._message(MessageType.LOG,
+                      "Build environment for element {}".format(element.name),
+                      detail=env_dump, logfile=logfile)
+
+    # _child_log()
+    #
+    # Log a message returned by the frontend's main message handler
+    # and return it to the main process.
+    #
+    # Arguments:
+    #     message (str): The message to log
+    #
+    # Returns:
+    #     message (Message): A message object
+    #
+    def _child_log(self, message):
+        # Tag them on the way out the door...
+        message.action_name = self.action_name
+        message.task_id = self._element._get_unique_id()
+
+        # Use the plugin for the task for the output, not a plugin
+        # which might be acting on behalf of the task
+        plugin = _plugin_lookup(message.task_id)
+
+        with plugin._output_file() as output:
+            message_text = self._format_frontend_message(message, '[{}]'.format(plugin.name))
+            output.write('{}\n'.format(message_text))
+            output.flush()
+
+        return message
+
+    # _child_process_data()
+    #
+    # Abstract method to retrieve additional data that should be
+    # returned to the parent process. Note that the job result is
+    # retrieved independently.
+    #
+    # Values can later be retrieved in Job.child_data.
+    #
+    # Returns:
+    #    (dict) A dict containing values later to be read by _process_sync_data
+    #
+    def _child_process_data(self):
+        data = {}
+
+        workspace = self._element._get_workspace()
+        if workspace is not None:
+            data['workspace'] = workspace.to_dict()
+
+        return data
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/jobs/job.py
similarity index 69%
rename from buildstream/_scheduler/job.py
rename to buildstream/_scheduler/jobs/job.py
index b8b81f2..6728ae0 100644
--- a/buildstream/_scheduler/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -27,20 +27,21 @@ import datetime
 import traceback
 import asyncio
 import multiprocessing
-from ruamel import yaml
+from contextlib import contextmanager
+
+import psutil
 
 # BuildStream toplevel imports
-from .._exceptions import BstError, set_last_task_error
-from .._message import Message, MessageType, unconditional_messages
-from ..plugin import _plugin_lookup
-from .. import _signals, utils
+from ..._exceptions import ImplError, BstError, set_last_task_error
+from ..._message import MessageType, unconditional_messages
+from ... import _signals, utils
 
 
 # Used to distinguish between status messages and return values
 class Envelope():
     def __init__(self, message_type, message):
-        self.message_type = message_type
-        self.message = message
+        self._message_type = message_type
+        self._message = message
 
 
 # Process class that doesn't call waitpid on its own.
@@ -55,54 +56,24 @@ class Process(multiprocessing.Process):
 # Job()
 #
 # The Job object represents a parallel task, when calling Job.spawn(),
-# the given `action_cb` will be called in parallel to the calling process,
-# and `complete_cb` will be called with the action result in the calling
-# process when the job completes.
+# the given `Job._child_process` will be called in parallel to the
+# calling process, and `Job._parent_complete` will be called with the
+# action result in the calling process when the job completes.
 #
 # Args:
 #    scheduler (Scheduler): The scheduler
-#    element (Element): The element to operate on
 #    action_name (str): The queue action name
-#    action_cb (callable): The action function
-#    complete_cb (callable): The function to call when complete
 #    max_retries (int): The maximum number of retries
 #
-# Here is the calling signature of the action_cb:
-#
-#     action_cb():
-#
-#     This function will be called in the child task
-#
-#     Args:
-#        element (Element): The element passed to the Job() constructor
-#
-#     Returns:
-#        (object): Any abstract simple python object, including a string, int,
-#                  bool, list or dict, this must be a simple serializable object.
-#
-# Here is the calling signature of the complete_cb:
-#
-#     complete_cb():
-#
-#     This function will be called when the child task completes
-#
-#     Args:
-#        job (Job): The job object which completed
-#        element (Element): The element passed to the Job() constructor
-#        success (bool): True if the action_cb did not raise an exception
-#        result (object): The deserialized object returned by the `action_cb`, or None
-#                         if `success` is False
-#
 class Job():
 
-    def __init__(self, scheduler, element, action_name, action_cb, complete_cb, *, max_retries=0):
+    def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
 
         #
         # Public members
         #
-        self.element = element           # The element we're processing
         self.action_name = action_name   # The action name for the Queue
-        self.workspace_dict = None       # A serialized Workspace object, after any modifications
+        self.child_data = None
 
         #
         # Private members
@@ -111,13 +82,12 @@ class Job():
         self._queue = multiprocessing.Queue()  # A message passing queue
         self._process = None                   # The Process object
         self._watcher = None                   # Child process watcher
-        self._action_cb = action_cb            # The action callable function
-        self._complete_cb = complete_cb        # The complete callable function
         self._listening = False                # Whether the parent is currently listening
         self._suspended = False                # Whether this job is currently suspended
         self._max_retries = max_retries        # Maximum number of automatic retries
         self._result = None                    # Return value of child action in the parent
         self._tries = 0                        # Try count, for retryable jobs
+        self._logfile = logfile
 
     # spawn()
     #
@@ -153,8 +123,7 @@ class Job():
         # First resume the job if it's suspended
         self.resume(silent=True)
 
-        self._message(self.element, MessageType.STATUS,
-                      "{} terminating".format(self.action_name))
+        self._message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
         # Make sure there is no garbage on the queue
         self._parent_stop_listening()
@@ -185,9 +154,15 @@ class Job():
     def kill(self):
 
         # Force kill
-        self._message(self.element, MessageType.WARN,
+        self._message(MessageType.WARN,
                       "{} did not terminate gracefully, killing".format(self.action_name))
-        utils._kill_process_tree(self._process.pid)
+
+        try:
+            utils._kill_process_tree(self._process.pid)
+        # This can happen if the process died of its own accord before
+        # we try to kill it
+        except psutil.NoSuchProcess:
+            return
 
     # suspend()
     #
@@ -195,7 +170,7 @@ class Job():
     #
     def suspend(self):
         if not self._suspended:
-            self._message(self.element, MessageType.STATUS,
+            self._message(MessageType.STATUS,
                           "{} suspending".format(self.action_name))
 
             try:
@@ -220,42 +195,152 @@ class Job():
     def resume(self, silent=False):
         if self._suspended:
             if not silent:
-                self._message(self.element, MessageType.STATUS,
+                self._message(MessageType.STATUS,
                               "{} resuming".format(self.action_name))
 
             os.kill(self._process.pid, signal.SIGCONT)
             self._suspended = False
 
     #######################################################
-    #                  Local Private Methods              #
+    #                  Abstract Methods                   #
     #######################################################
+    # _parent_complete()
     #
-    # Methods prefixed with the word 'child' take place in the child process
+    # This will be executed after the job finishes, and is expected to
+    # pass the result to the main thread.
     #
-    # Methods prefixed with the word 'parent' take place in the parent process
+    # Args:
+    #    success (bool): Whether the job was successful.
+    #    result (any): The result returned by _child_process().
     #
-    # Other methods can be called in both child or parent processes
+    def _parent_complete(self, success, result):
+        raise ImplError("Job '{kind}' does not implement _parent_complete()"
+                        .format(kind=type(self).__name__))
+
+    # _child_process()
     #
-    #######################################################
+    # This will be executed after fork(), and is intended to perform
+    # the job's task.
+    #
+    # Returns:
+    #    (any): A (simple!) object to be returned to the main thread
+    #           as the result.
+    #
+    def _child_process(self):
+        raise ImplError("Job '{kind}' does not implement _child_process()"
+                        .format(kind=type(self).__name__))
+
+    # _child_logging_enabled()
+    #
+    # Start the log for this job. This function will be given a
+    # template string for the path to a log file - this will contain
+    # "{pid}", which should be replaced with the current process'
+    # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
+    #
+    # Args:
+    #    logfile (str): A template string that points to the logfile
+    #                   that should be used - replace {pid} first.
+    #
+    # Yields:
+    #    (str) The path to the logfile with {pid} replaced.
+    #
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        raise ImplError("Job '{kind}' does not implement _child_logging_enabled()"
+                        .format(kind=type(self).__name__))
 
     # _message():
     #
     # Sends a message to the frontend
     #
     # Args:
-    #    plugin (Plugin): The plugin to send a message for
     #    message_type (MessageType): The type of message to send
     #    message (str): The message
     #    kwargs: Remaining Message() constructor arguments
     #
-    def _message(self, plugin, message_type, message, **kwargs):
-        args = dict(kwargs)
-        args['scheduler'] = True
-        self._scheduler.context.message(
-            Message(plugin._get_unique_id(),
-                    message_type,
-                    message,
-                    **args))
+    def _message(self, message_type, message, **kwargs):
+        raise ImplError("Job '{kind}' does not implement _message()"
+                        .format(kind=type(self).__name__))
+
+    # _child_process_data()
+    #
+    # Abstract method to retrieve additional data that should be
+    # returned to the parent process. Note that the job result is
+    # retrieved independently.
+    #
+    # Values can later be retrieved in Job.child_data.
+    #
+    # Returns:
+    #    (dict) A dict containing values later to be read by _process_sync_data
+    #
+    def _child_process_data(self):
+        return {}
+
+    # _child_log()
+    #
+    # Log a message returned by the frontend's main message handler
+    # and return it to the main process.
+    #
+    # This method is also expected to add process-specific information
+    # to the message (notably, action_name and task_id).
+    #
+    # Arguments:
+    #     message (str): The message to log
+    #
+    # Returns:
+    #     message (Message): A message object
+    #
+    def _child_log(self, message):
+        raise ImplError("Job '{kind}' does not implement _child_log()"
+                        .format(kind=type(self).__name__))
+
+    #######################################################
+    #                  Local Private Methods              #
+    #######################################################
+    #
+    # Methods prefixed with the word 'child' take place in the child process
+    #
+    # Methods prefixed with the word 'parent' take place in the parent process
+    #
+    # Other methods can be called in both child or parent processes
+    #
+    #######################################################
+
+    # _format_frontend_message()
+    #
+    # Format a message from the frontend for logging purposes. This
+    # will prepend a time code and add other information to help
+    # determine what happened.
+    #
+    # Args:
+    #    message (Message) - The message to create a text from.
+    #    name (str) - A name for the executing context.
+    #
+    # Returns:
+    #    (str) The text to log.
+    #
+    def _format_frontend_message(self, message, name):
+        INDENT = "    "
+        EMPTYTIME = "--:--:--"
+        template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
+
+        detail = ''
+        if message.detail is not None:
+            template += "\n\n{detail}"
+            detail = message.detail.rstrip('\n')
+            detail = INDENT + INDENT.join(detail.splitlines(True))
+
+        timecode = EMPTYTIME
+        if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+            hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+            minutes, seconds = divmod(remainder, 60)
+            timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+        return template.format(timecode=timecode,
+                               type=message.message_type.upper(),
+                               name=name,
+                               message=message.message,
+                               detail=detail)
 
     # _child_action()
     #
@@ -266,7 +351,7 @@ class Job():
     #
     def _child_action(self, queue):
 
-        element = self.element
+        logfile = self._logfile
 
         # This avoids some SIGTSTP signals from grandchildren
         # getting propagated up to the master process
@@ -302,35 +387,24 @@ class Job():
         # Time, log and and run the action function
         #
         with _signals.suspendable(stop_time, resume_time), \
-            element._logging_enabled(self.action_name) as filename:
-
-            self._message(element, MessageType.START, self.action_name, logfile=filename)
-
-            # Print the element's environment at the beginning of any element's log file.
-            #
-            # This should probably be omitted for non-build tasks but it's harmless here
-            elt_env = element.get_environment()
-            env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
-            self._message(element, MessageType.LOG,
-                          "Build environment for element {}".format(element.name),
-                          detail=env_dump, logfile=filename)
+            self._child_logging_enabled(logfile) as filename:
 
             try:
                 # Try the task action
-                result = self._action_cb(element)
+                result = self._child_process()
             except BstError as e:
                 elapsed = datetime.datetime.now() - starttime
 
                 if self._tries <= self._max_retries:
-                    self._message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self._tries),
+                    self._message(MessageType.FAIL,
+                                  "Try #{} failed, retrying".format(self._tries),
                                   elapsed=elapsed)
                 else:
-                    self._message(element, MessageType.FAIL, str(e),
+                    self._message(MessageType.FAIL, str(e),
                                   elapsed=elapsed, detail=e.detail,
                                   logfile=filename, sandbox=e.sandbox)
 
-                # Report changes in the workspace, even if there was a handled failure
-                self._child_send_workspace()
+                self._queue.put(Envelope('child_data', self._child_process_data()))
 
                 # Report the exception to the parent (for internal testing purposes)
                 self._child_send_error(e)
@@ -344,18 +418,19 @@ class Job():
                 #
                 elapsed = datetime.datetime.now() - starttime
                 detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
-                self._message(element, MessageType.BUG, self.action_name,
+
+                self._message(MessageType.BUG, self.action_name,
                               elapsed=elapsed, detail=detail,
                               logfile=filename)
                 self._child_shutdown(1)
 
             else:
                 # No exception occurred in the action
-                self._child_send_workspace()
+                self._queue.put(Envelope('child_data', self._child_process_data()))
                 self._child_send_result(result)
 
                 elapsed = datetime.datetime.now() - starttime
-                self._message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed,
+                self._message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
                               logfile=filename)
 
                 # Shutdown needs to stay outside of the above context manager,
@@ -399,16 +474,6 @@ class Job():
             envelope = Envelope('result', result)
             self._queue.put(envelope)
 
-    # _child_send_workspace()
-    #
-    # Sends the serialized workspace through the message queue, if any
-    #
-    def _child_send_workspace(self):
-        workspace = self.element._get_workspace()
-        if workspace:
-            envelope = Envelope('workspace', workspace.to_dict())
-            self._queue.put(envelope)
-
     # _child_shutdown()
     #
     # Shuts down the child process by cleaning up and exiting the process
@@ -420,44 +485,6 @@ class Job():
         self._queue.close()
         sys.exit(exit_code)
 
-    # _child_log()
-    #
-    # Logs a Message to the process's dedicated log file
-    #
-    # Args:
-    #    plugin (Plugin): The plugin to log for
-    #    message (Message): The message to log
-    #
-    def _child_log(self, plugin, message):
-
-        with plugin._output_file() as output:
-            INDENT = "    "
-            EMPTYTIME = "--:--:--"
-
-            name = '[' + plugin.name + ']'
-
-            fmt = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
-            detail = ''
-            if message.detail is not None:
-                fmt += "\n\n{detail}"
-                detail = message.detail.rstrip('\n')
-                detail = INDENT + INDENT.join(detail.splitlines(True))
-
-            timecode = EMPTYTIME
-            if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
-                hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 * 60)
-                minutes, seconds = divmod(remainder, 60)
-                timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
-            message_text = fmt.format(timecode=timecode,
-                                      type=message.message_type.upper(),
-                                      name=name,
-                                      message=message.message,
-                                      detail=detail)
-
-            output.write('{}\n'.format(message_text))
-            output.flush()
-
     # _child_message_handler()
     #
     # A Context delegate for handling messages, this replaces the
@@ -471,16 +498,8 @@ class Job():
     #
     def _child_message_handler(self, message, context):
 
-        # Tag them on the way out the door...
-        message.action_name = self.action_name
-        message.task_id = self.element._get_unique_id()
-
-        # Use the plugin for the task for the output, not a plugin
-        # which might be acting on behalf of the task
-        plugin = _plugin_lookup(message.task_id)
-
         # Log first
-        self._child_log(plugin, message)
+        message = self._child_log(message)
 
         if message.message_type == MessageType.FAIL and self._tries <= self._max_retries:
             # Job will be retried, display failures as warnings in the frontend
@@ -520,7 +539,7 @@ class Job():
             self.spawn()
             return
 
-        self._complete_cb(self, self.element, returncode == 0, self._result)
+        self._parent_complete(returncode == 0, self._result)
 
     # _parent_process_envelope()
     #
@@ -537,21 +556,22 @@ class Job():
         if not self._listening:
             return
 
-        if envelope.message_type == 'message':
+        if envelope._message_type == 'message':
             # Propagate received messages from children
             # back through the context.
-            self._scheduler.context.message(envelope.message)
-        elif envelope.message_type == 'error':
+            self._scheduler.context.message(envelope._message)
+        elif envelope._message_type == 'error':
             # For regression tests only, save the last error domain / reason
             # reported from a child task in the main process, this global state
             # is currently managed in _exceptions.py
-            set_last_task_error(envelope.message['domain'],
-                                envelope.message['reason'])
-        elif envelope.message_type == 'result':
+            set_last_task_error(envelope._message['domain'],
+                                envelope._message['reason'])
+        elif envelope._message_type == 'result':
             assert self._result is None
-            self._result = envelope.message
-        elif envelope.message_type == 'workspace':
-            self.workspace_dict = envelope.message
+            self._result = envelope._message
+        elif envelope._message_type == 'child_data':
+            assert self.child_data is None
+            self.child_data = envelope._message
         else:
             raise Exception()
 
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index e733776..648bebb 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -20,12 +20,13 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 
 # System imports
+import os
 from collections import deque
 from enum import Enum
 import traceback
 
 # Local imports
-from ..job import Job
+from ..jobs import ElementJob
 
 # BuildStream toplevel imports
 from ..._exceptions import BstError, set_last_task_error
@@ -238,12 +239,15 @@ class Queue():
                 self.skipped_elements.append(element)
                 continue
 
+            logfile = self._element_log_path(element)
             self.prepare(element)
 
-            job = Job(scheduler, element, self.action_name,
-                      self.process, self._job_done,
-                      max_retries=self._max_retries)
-            scheduler.job_starting(job)
+            job = ElementJob(scheduler, self.action_name,
+                             logfile, element=element,
+                             action_cb=self.process,
+                             complete_cb=self._job_done,
+                             max_retries=self._max_retries)
+            scheduler.job_starting(job, element)
 
             job.spawn()
             self.active_jobs.append(job)
@@ -266,11 +270,15 @@ class Queue():
     #    job (Job): The job which completed
     #
     def _update_workspaces(self, element, job):
+        workspace_dict = None
+        if job.child_data:
+            workspace_dict = job.child_data['workspace']
+
         # Handle any workspace modifications now
         #
-        if job.workspace_dict:
+        if workspace_dict:
             project = element._get_project()
-            if project.workspaces.update_workspace(element.name, job.workspace_dict):
+            if project.workspaces.update_workspace(element.name, workspace_dict):
                 try:
                     project.workspaces.save_config()
                 except BstError as e:
@@ -343,7 +351,7 @@ class Queue():
         self._scheduler.put_job_token(self.queue_type)
 
         # Notify frontend
-        self._scheduler.job_completed(self, job, success)
+        self._scheduler.job_completed(self, job, element, success)
 
         self._scheduler.sched()
 
@@ -353,3 +361,16 @@ class Queue():
         context = element._get_context()
         message = Message(element._get_unique_id(), message_type, brief, **kwargs)
         context.message(message)
+
+    def _element_log_path(self, element):
+        project = element._get_project()
+        context = element._get_context()
+
+        key = element._get_display_key()[1]
+        action = self.action_name.lower()
+        logfile = "{key}-{action}.{{pid}}.log".format(key=key, action=action)
+
+        directory = os.path.join(context.logdir, project.name, element.normal_name)
+
+        os.makedirs(directory, exist_ok=True)
+        return os.path.join(directory, logfile)
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 9cbfd49..a475722 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -298,9 +298,9 @@ class Scheduler():
     # Args:
     #    job (Job): The starting Job
     #
-    def job_starting(self, job):
+    def job_starting(self, job, element):
         if self._job_start_callback:
-            self._job_start_callback(job.element, job.action_name)
+            self._job_start_callback(element, job.action_name)
 
     # job_completed():
     #
@@ -311,9 +311,9 @@ class Scheduler():
     #    job (Job): The completed Job
     #    success (bool): Whether the Job completed with a success status
     #
-    def job_completed(self, queue, job, success):
+    def job_completed(self, queue, job, element, success):
         if self._job_complete_callback:
-            self._job_complete_callback(job.element, queue, job.action_name, success)
+            self._job_complete_callback(element, queue, job.action_name, success)
 
     #######################################################
     #                  Local Private Methods              #