You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:56:09 UTC

[buildstream] branch 372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes created (now 92a6e34)

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a change to branch 372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at 92a6e34  buildstream/_scheduler/*.py: Make job submission a queue job

This branch includes the following new commits:

     new 8264e69  buildstream/_scheduler/*queue.py: Move queues to a subdirectory
     new ff7a00f  buildstream/_scheduler/*: Make Jobs abstract and element-independent
     new 8e59ca0  buildstream/_scheduler/*: Add QueueRunner
     new 92a6e34  buildstream/_scheduler/*.py: Make job submission a queue job

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 02/04: buildstream/_scheduler/*: Make Jobs abstract and element-independent

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch 372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit ff7a00fdd755a9932f1f1aae937c944ef7f360f1
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Thu Apr 19 17:11:54 2018 +0100

    buildstream/_scheduler/*: Make Jobs abstract and element-independent
---
 buildstream/_scheduler/jobs/__init__.py   |   2 +
 buildstream/_scheduler/jobs/elementjob.py | 210 +++++++++++++++++++
 buildstream/_scheduler/{ => jobs}/job.py  | 324 ++++++++++++++++--------------
 buildstream/_scheduler/queues/queue.py    |  37 +++-
 buildstream/_scheduler/scheduler.py       |   8 +-
 5 files changed, 417 insertions(+), 164 deletions(-)

diff --git a/buildstream/_scheduler/jobs/__init__.py b/buildstream/_scheduler/jobs/__init__.py
new file mode 100644
index 0000000..f215c39
--- /dev/null
+++ b/buildstream/_scheduler/jobs/__init__.py
@@ -0,0 +1,2 @@
+from .job import Job
+from .elementjob import ElementJob
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
new file mode 100644
index 0000000..379e2d1
--- /dev/null
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -0,0 +1,210 @@
+#  Copyright (C) 2018 Codethink Limited
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Author:
+#        Tristan Daniël Maat <tr...@codethink.co.uk>
+#
+import os
+from contextlib import contextmanager
+
+from ruamel import yaml
+
+from ..._message import Message, MessageType
+from ...plugin import _plugin_lookup
+from ... import _signals
+
+from .job import Job
+
+
+# ElementJob()
+#
+# A job to run an element's commands. When this job is spawned
+# `action_cb` will be called, and when it completes `complete_cb` will
+# be called.
+#
+# Args:
+#    scheduler (Scheduler): The scheduler
+#    action_name (str): The queue action name
+#    max_retries (int): The maximum number of retries
+#    action_cb (callable): The function to execute on the child
+#    complete_cb (callable): The function to execute when the job completes
+#    element (Element): The element to work on
+#
+# Here is the calling signature of the action_cb:
+#
+#     action_cb():
+#
+#     This function will be called in the child task
+#
+#     Args:
+#        element (Element): The element passed to the Job() constructor
+#
+#     Returns:
+#        (object): Any abstract simple python object, including a string, int,
+#                  bool, list or dict, this must be a simple serializable object.
+#
+# Here is the calling signature of the complete_cb:
+#
+#     complete_cb():
+#
+#     This function will be called when the child task completes
+#
+#     Args:
+#        job (Job): The job object which completed
+#        element (Element): The element passed to the Job() constructor
+#        success (bool): True if the action_cb did not raise an exception
+#        result (object): The deserialized object returned by the `action_cb`, or None
+#                         if `success` is False
+#
+class ElementJob(Job):
+    def __init__(self, *args, element, action_cb, complete_cb, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._element = element
+        self._action_cb = action_cb            # The action callable function
+        self._complete_cb = complete_cb        # The complete callable function
+
+    # _child_process()
+    #
+    # This will be executed after fork(), and is intended to perform
+    # the job's task.
+    #
+    # Returns:
+    #    (any): A (simple!) object to be returned to the main thread
+    #           as the result.
+    #
+    def _child_process(self):
+        return self._action_cb(self._element)
+
+    def _parent_complete(self, success, result):
+        self._complete_cb(self, self._element, success, self._result)
+
+    # _child_logging_enabled()
+    #
+    # Start the log for this job. This function will be given a
+    # template string for the path to a log file - this will contain
+    # "{pid}", which should be replaced with the current process'
+    # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
+    #
+    # Args:
+    #    logfile (str): A template string that points to the logfile
+    #                   that should be used - replace {pid} first.
+    #
+    # Yields:
+    #    (str) The path to the logfile with {pid} replaced.
+    #
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        self._logfile = logfile.format(pid=os.getpid())
+
+        with open(self._logfile, 'a') as log:
+            # Write one last line to the log and flush it to disk
+            def flush_log():
+
+                # If the process currently had something happening in the I/O stack
+                # then trying to reenter the I/O stack will fire a runtime error.
+                #
+                # So just try to flush as well as we can at SIGTERM time
+                try:
+                    # FIXME: Better logging
+
+                    log.write('\n\nAction {} for element {} forcefully terminated\n'
+                              .format(self.action_name, self._element.name))
+                    log.flush()
+                except RuntimeError:
+                    os.fsync(log.fileno())
+
+            self._element._set_log_handle(log)
+            with _signals.terminator(flush_log):
+                self._print_start_message(self._element, self._logfile)
+                yield self._logfile
+            self._element._set_log_handle(None)
+            self._logfile = None
+
+    # _message():
+    #
+    # Sends a message to the frontend
+    #
+    # Args:
+    #    message_type (MessageType): The type of message to send
+    #    message (str): The message
+    #    kwargs: Remaining Message() constructor arguments
+    #
+    def _message(self, message_type, message, **kwargs):
+        args = dict(kwargs)
+        args['scheduler'] = True
+        self._scheduler.context.message(
+            Message(self._element._get_unique_id(),
+                    message_type,
+                    message,
+                    **args))
+
+    def _print_start_message(self, element, logfile):
+        self._message(MessageType.START, self.action_name, logfile=logfile)
+
+        # Print the element's environment at the beginning of any element's log file.
+        #
+        # This should probably be omitted for non-build tasks but it's harmless here
+        elt_env = element.get_environment()
+        env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
+        self._message(MessageType.LOG,
+                      "Build environment for element {}".format(element.name),
+                      detail=env_dump, logfile=logfile)
+
+    # _child_log()
+    #
+    # Log a message returned by the frontend's main message handler
+    # and return it to the main process.
+    #
+    # Arguments:
+    #     message (str): The message to log
+    #
+    # Returns:
+    #     message (Message): A message object
+    #
+    def _child_log(self, message):
+        # Tag them on the way out the door...
+        message.action_name = self.action_name
+        message.task_id = self._element._get_unique_id()
+
+        # Use the plugin for the task for the output, not a plugin
+        # which might be acting on behalf of the task
+        plugin = _plugin_lookup(message.task_id)
+
+        with plugin._output_file() as output:
+            message_text = self._format_frontend_message(message, '[{}]'.format(plugin.name))
+            output.write('{}\n'.format(message_text))
+            output.flush()
+
+        return message
+
+    # _child_process_data()
+    #
+    # Abstract method to retrieve additional data that should be
+    # returned to the parent process. Note that the job result is
+    # retrieved independently.
+    #
+    # Values can later be retrieved in Job.child_data.
+    #
+    # Returns:
+    #    (dict) A dict containing values later to be read by _process_sync_data
+    #
+    def _child_process_data(self):
+        data = {}
+
+        workspace = self._element._get_workspace()
+        if workspace is not None:
+            data['workspace'] = workspace.to_dict()
+
+        return data
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/jobs/job.py
similarity index 69%
rename from buildstream/_scheduler/job.py
rename to buildstream/_scheduler/jobs/job.py
index b8b81f2..6728ae0 100644
--- a/buildstream/_scheduler/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -27,20 +27,21 @@ import datetime
 import traceback
 import asyncio
 import multiprocessing
-from ruamel import yaml
+from contextlib import contextmanager
+
+import psutil
 
 # BuildStream toplevel imports
-from .._exceptions import BstError, set_last_task_error
-from .._message import Message, MessageType, unconditional_messages
-from ..plugin import _plugin_lookup
-from .. import _signals, utils
+from ..._exceptions import ImplError, BstError, set_last_task_error
+from ..._message import MessageType, unconditional_messages
+from ... import _signals, utils
 
 
 # Used to distinguish between status messages and return values
 class Envelope():
     def __init__(self, message_type, message):
-        self.message_type = message_type
-        self.message = message
+        self._message_type = message_type
+        self._message = message
 
 
 # Process class that doesn't call waitpid on its own.
@@ -55,54 +56,24 @@ class Process(multiprocessing.Process):
 # Job()
 #
 # The Job object represents a parallel task, when calling Job.spawn(),
-# the given `action_cb` will be called in parallel to the calling process,
-# and `complete_cb` will be called with the action result in the calling
-# process when the job completes.
+# the given `Job._child_process` will be called in parallel to the
+# calling process, and `Job._parent_complete` will be called with the
+# action result in the calling process when the job completes.
 #
 # Args:
 #    scheduler (Scheduler): The scheduler
-#    element (Element): The element to operate on
 #    action_name (str): The queue action name
-#    action_cb (callable): The action function
-#    complete_cb (callable): The function to call when complete
 #    max_retries (int): The maximum number of retries
 #
-# Here is the calling signature of the action_cb:
-#
-#     action_cb():
-#
-#     This function will be called in the child task
-#
-#     Args:
-#        element (Element): The element passed to the Job() constructor
-#
-#     Returns:
-#        (object): Any abstract simple python object, including a string, int,
-#                  bool, list or dict, this must be a simple serializable object.
-#
-# Here is the calling signature of the complete_cb:
-#
-#     complete_cb():
-#
-#     This function will be called when the child task completes
-#
-#     Args:
-#        job (Job): The job object which completed
-#        element (Element): The element passed to the Job() constructor
-#        success (bool): True if the action_cb did not raise an exception
-#        result (object): The deserialized object returned by the `action_cb`, or None
-#                         if `success` is False
-#
 class Job():
 
-    def __init__(self, scheduler, element, action_name, action_cb, complete_cb, *, max_retries=0):
+    def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
 
         #
         # Public members
         #
-        self.element = element           # The element we're processing
         self.action_name = action_name   # The action name for the Queue
-        self.workspace_dict = None       # A serialized Workspace object, after any modifications
+        self.child_data = None
 
         #
         # Private members
@@ -111,13 +82,12 @@ class Job():
         self._queue = multiprocessing.Queue()  # A message passing queue
         self._process = None                   # The Process object
         self._watcher = None                   # Child process watcher
-        self._action_cb = action_cb            # The action callable function
-        self._complete_cb = complete_cb        # The complete callable function
         self._listening = False                # Whether the parent is currently listening
         self._suspended = False                # Whether this job is currently suspended
         self._max_retries = max_retries        # Maximum number of automatic retries
         self._result = None                    # Return value of child action in the parent
         self._tries = 0                        # Try count, for retryable jobs
+        self._logfile = logfile
 
     # spawn()
     #
@@ -153,8 +123,7 @@ class Job():
         # First resume the job if it's suspended
         self.resume(silent=True)
 
-        self._message(self.element, MessageType.STATUS,
-                      "{} terminating".format(self.action_name))
+        self._message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
         # Make sure there is no garbage on the queue
         self._parent_stop_listening()
@@ -185,9 +154,15 @@ class Job():
     def kill(self):
 
         # Force kill
-        self._message(self.element, MessageType.WARN,
+        self._message(MessageType.WARN,
                       "{} did not terminate gracefully, killing".format(self.action_name))
-        utils._kill_process_tree(self._process.pid)
+
+        try:
+            utils._kill_process_tree(self._process.pid)
+        # This can happen if the process died of its own accord before
+        # we try to kill it
+        except psutil.NoSuchProcess:
+            return
 
     # suspend()
     #
@@ -195,7 +170,7 @@ class Job():
     #
     def suspend(self):
         if not self._suspended:
-            self._message(self.element, MessageType.STATUS,
+            self._message(MessageType.STATUS,
                           "{} suspending".format(self.action_name))
 
             try:
@@ -220,42 +195,152 @@ class Job():
     def resume(self, silent=False):
         if self._suspended:
             if not silent:
-                self._message(self.element, MessageType.STATUS,
+                self._message(MessageType.STATUS,
                               "{} resuming".format(self.action_name))
 
             os.kill(self._process.pid, signal.SIGCONT)
             self._suspended = False
 
     #######################################################
-    #                  Local Private Methods              #
+    #                  Abstract Methods                   #
     #######################################################
+    # _parent_complete()
     #
-    # Methods prefixed with the word 'child' take place in the child process
+    # This will be executed after the job finishes, and is expected to
+    # pass the result to the main thread.
     #
-    # Methods prefixed with the word 'parent' take place in the parent process
+    # Args:
+    #    success (bool): Whether the job was successful.
+    #    result (any): The result returned by _child_process().
     #
-    # Other methods can be called in both child or parent processes
+    def _parent_complete(self, success, result):
+        raise ImplError("Job '{kind}' does not implement _parent_complete()"
+                        .format(kind=type(self).__name__))
+
+    # _child_process()
     #
-    #######################################################
+    # This will be executed after fork(), and is intended to perform
+    # the job's task.
+    #
+    # Returns:
+    #    (any): A (simple!) object to be returned to the main thread
+    #           as the result.
+    #
+    def _child_process(self):
+        raise ImplError("Job '{kind}' does not implement _child_process()"
+                        .format(kind=type(self).__name__))
+
+    # _child_logging_enabled()
+    #
+    # Start the log for this job. This function will be given a
+    # template string for the path to a log file - this will contain
+    # "{pid}", which should be replaced with the current process'
+    # PID. (i.e., call something like `logfile.format(pid=os.getpid())`).
+    #
+    # Args:
+    #    logfile (str): A template string that points to the logfile
+    #                   that should be used - replace {pid} first.
+    #
+    # Yields:
+    #    (str) The path to the logfile with {pid} replaced.
+    #
+    @contextmanager
+    def _child_logging_enabled(self, logfile):
+        raise ImplError("Job '{kind}' does not implement _child_logging_enabled()"
+                        .format(kind=type(self).__name__))
 
     # _message():
     #
     # Sends a message to the frontend
     #
     # Args:
-    #    plugin (Plugin): The plugin to send a message for
     #    message_type (MessageType): The type of message to send
     #    message (str): The message
     #    kwargs: Remaining Message() constructor arguments
     #
-    def _message(self, plugin, message_type, message, **kwargs):
-        args = dict(kwargs)
-        args['scheduler'] = True
-        self._scheduler.context.message(
-            Message(plugin._get_unique_id(),
-                    message_type,
-                    message,
-                    **args))
+    def _message(self, message_type, message, **kwargs):
+        raise ImplError("Job '{kind}' does not implement _message()"
+                        .format(kind=type(self).__name__))
+
+    # _child_process_data()
+    #
+    # Abstract method to retrieve additional data that should be
+    # returned to the parent process. Note that the job result is
+    # retrieved independently.
+    #
+    # Values can later be retrieved in Job.child_data.
+    #
+    # Returns:
+    #    (dict) A dict containing values later to be read by _process_sync_data
+    #
+    def _child_process_data(self):
+        return {}
+
+    # _child_log()
+    #
+    # Log a message returned by the frontend's main message handler
+    # and return it to the main process.
+    #
+    # This method is also expected to add process-specific information
+    # to the message (notably, action_name and task_id).
+    #
+    # Arguments:
+    #     message (str): The message to log
+    #
+    # Returns:
+    #     message (Message): A message object
+    #
+    def _child_log(self, message):
+        raise ImplError("Job '{kind}' does not implement _child_log()"
+                        .format(kind=type(self).__name__))
+
+    #######################################################
+    #                  Local Private Methods              #
+    #######################################################
+    #
+    # Methods prefixed with the word 'child' take place in the child process
+    #
+    # Methods prefixed with the word 'parent' take place in the parent process
+    #
+    # Other methods can be called in both child or parent processes
+    #
+    #######################################################
+
+    # _format_frontend_message()
+    #
+    # Format a message from the frontend for logging purposes. This
+    # will prepend a time code and add other information to help
+    # determine what happened.
+    #
+    # Args:
+    #    message (Message) - The message to create a text from.
+    #    name (str) - A name for the executing context.
+    #
+    # Returns:
+    #    (str) The text to log.
+    #
+    def _format_frontend_message(self, message, name):
+        INDENT = "    "
+        EMPTYTIME = "--:--:--"
+        template = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
+
+        detail = ''
+        if message.detail is not None:
+            template += "\n\n{detail}"
+            detail = message.detail.rstrip('\n')
+            detail = INDENT + INDENT.join(detail.splitlines(True))
+
+        timecode = EMPTYTIME
+        if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+            hours, remainder = divmod(int(message.elapsed.total_seconds()), 60**2)
+            minutes, seconds = divmod(remainder, 60)
+            timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+        return template.format(timecode=timecode,
+                               type=message.message_type.upper(),
+                               name=name,
+                               message=message.message,
+                               detail=detail)
 
     # _child_action()
     #
@@ -266,7 +351,7 @@ class Job():
     #
     def _child_action(self, queue):
 
-        element = self.element
+        logfile = self._logfile
 
         # This avoids some SIGTSTP signals from grandchildren
         # getting propagated up to the master process
@@ -302,35 +387,24 @@ class Job():
         # Time, log and and run the action function
         #
         with _signals.suspendable(stop_time, resume_time), \
-            element._logging_enabled(self.action_name) as filename:
-
-            self._message(element, MessageType.START, self.action_name, logfile=filename)
-
-            # Print the element's environment at the beginning of any element's log file.
-            #
-            # This should probably be omitted for non-build tasks but it's harmless here
-            elt_env = element.get_environment()
-            env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
-            self._message(element, MessageType.LOG,
-                          "Build environment for element {}".format(element.name),
-                          detail=env_dump, logfile=filename)
+            self._child_logging_enabled(logfile) as filename:
 
             try:
                 # Try the task action
-                result = self._action_cb(element)
+                result = self._child_process()
             except BstError as e:
                 elapsed = datetime.datetime.now() - starttime
 
                 if self._tries <= self._max_retries:
-                    self._message(element, MessageType.FAIL, "Try #{} failed, retrying".format(self._tries),
+                    self._message(MessageType.FAIL,
+                                  "Try #{} failed, retrying".format(self._tries),
                                   elapsed=elapsed)
                 else:
-                    self._message(element, MessageType.FAIL, str(e),
+                    self._message(MessageType.FAIL, str(e),
                                   elapsed=elapsed, detail=e.detail,
                                   logfile=filename, sandbox=e.sandbox)
 
-                # Report changes in the workspace, even if there was a handled failure
-                self._child_send_workspace()
+                self._queue.put(Envelope('child_data', self._child_process_data()))
 
                 # Report the exception to the parent (for internal testing purposes)
                 self._child_send_error(e)
@@ -344,18 +418,19 @@ class Job():
                 #
                 elapsed = datetime.datetime.now() - starttime
                 detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
-                self._message(element, MessageType.BUG, self.action_name,
+
+                self._message(MessageType.BUG, self.action_name,
                               elapsed=elapsed, detail=detail,
                               logfile=filename)
                 self._child_shutdown(1)
 
             else:
                 # No exception occurred in the action
-                self._child_send_workspace()
+                self._queue.put(Envelope('child_data', self._child_process_data()))
                 self._child_send_result(result)
 
                 elapsed = datetime.datetime.now() - starttime
-                self._message(element, MessageType.SUCCESS, self.action_name, elapsed=elapsed,
+                self._message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
                               logfile=filename)
 
                 # Shutdown needs to stay outside of the above context manager,
@@ -399,16 +474,6 @@ class Job():
             envelope = Envelope('result', result)
             self._queue.put(envelope)
 
-    # _child_send_workspace()
-    #
-    # Sends the serialized workspace through the message queue, if any
-    #
-    def _child_send_workspace(self):
-        workspace = self.element._get_workspace()
-        if workspace:
-            envelope = Envelope('workspace', workspace.to_dict())
-            self._queue.put(envelope)
-
     # _child_shutdown()
     #
     # Shuts down the child process by cleaning up and exiting the process
@@ -420,44 +485,6 @@ class Job():
         self._queue.close()
         sys.exit(exit_code)
 
-    # _child_log()
-    #
-    # Logs a Message to the process's dedicated log file
-    #
-    # Args:
-    #    plugin (Plugin): The plugin to log for
-    #    message (Message): The message to log
-    #
-    def _child_log(self, plugin, message):
-
-        with plugin._output_file() as output:
-            INDENT = "    "
-            EMPTYTIME = "--:--:--"
-
-            name = '[' + plugin.name + ']'
-
-            fmt = "[{timecode: <8}] {type: <7} {name: <15}: {message}"
-            detail = ''
-            if message.detail is not None:
-                fmt += "\n\n{detail}"
-                detail = message.detail.rstrip('\n')
-                detail = INDENT + INDENT.join(detail.splitlines(True))
-
-            timecode = EMPTYTIME
-            if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
-                hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 * 60)
-                minutes, seconds = divmod(remainder, 60)
-                timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
-            message_text = fmt.format(timecode=timecode,
-                                      type=message.message_type.upper(),
-                                      name=name,
-                                      message=message.message,
-                                      detail=detail)
-
-            output.write('{}\n'.format(message_text))
-            output.flush()
-
     # _child_message_handler()
     #
     # A Context delegate for handling messages, this replaces the
@@ -471,16 +498,8 @@ class Job():
     #
     def _child_message_handler(self, message, context):
 
-        # Tag them on the way out the door...
-        message.action_name = self.action_name
-        message.task_id = self.element._get_unique_id()
-
-        # Use the plugin for the task for the output, not a plugin
-        # which might be acting on behalf of the task
-        plugin = _plugin_lookup(message.task_id)
-
         # Log first
-        self._child_log(plugin, message)
+        message = self._child_log(message)
 
         if message.message_type == MessageType.FAIL and self._tries <= self._max_retries:
             # Job will be retried, display failures as warnings in the frontend
@@ -520,7 +539,7 @@ class Job():
             self.spawn()
             return
 
-        self._complete_cb(self, self.element, returncode == 0, self._result)
+        self._parent_complete(returncode == 0, self._result)
 
     # _parent_process_envelope()
     #
@@ -537,21 +556,22 @@ class Job():
         if not self._listening:
             return
 
-        if envelope.message_type == 'message':
+        if envelope._message_type == 'message':
             # Propagate received messages from children
             # back through the context.
-            self._scheduler.context.message(envelope.message)
-        elif envelope.message_type == 'error':
+            self._scheduler.context.message(envelope._message)
+        elif envelope._message_type == 'error':
             # For regression tests only, save the last error domain / reason
             # reported from a child task in the main process, this global state
             # is currently managed in _exceptions.py
-            set_last_task_error(envelope.message['domain'],
-                                envelope.message['reason'])
-        elif envelope.message_type == 'result':
+            set_last_task_error(envelope._message['domain'],
+                                envelope._message['reason'])
+        elif envelope._message_type == 'result':
             assert self._result is None
-            self._result = envelope.message
-        elif envelope.message_type == 'workspace':
-            self.workspace_dict = envelope.message
+            self._result = envelope._message
+        elif envelope._message_type == 'child_data':
+            assert self.child_data is None
+            self.child_data = envelope._message
         else:
             raise Exception()
 
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index e733776..648bebb 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -20,12 +20,13 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 
 # System imports
+import os
 from collections import deque
 from enum import Enum
 import traceback
 
 # Local imports
-from ..job import Job
+from ..jobs import ElementJob
 
 # BuildStream toplevel imports
 from ..._exceptions import BstError, set_last_task_error
@@ -238,12 +239,15 @@ class Queue():
                 self.skipped_elements.append(element)
                 continue
 
+            logfile = self._element_log_path(element)
             self.prepare(element)
 
-            job = Job(scheduler, element, self.action_name,
-                      self.process, self._job_done,
-                      max_retries=self._max_retries)
-            scheduler.job_starting(job)
+            job = ElementJob(scheduler, self.action_name,
+                             logfile, element=element,
+                             action_cb=self.process,
+                             complete_cb=self._job_done,
+                             max_retries=self._max_retries)
+            scheduler.job_starting(job, element)
 
             job.spawn()
             self.active_jobs.append(job)
@@ -266,11 +270,15 @@ class Queue():
     #    job (Job): The job which completed
     #
     def _update_workspaces(self, element, job):
+        workspace_dict = None
+        if job.child_data:
+            workspace_dict = job.child_data['workspace']
+
         # Handle any workspace modifications now
         #
-        if job.workspace_dict:
+        if workspace_dict:
             project = element._get_project()
-            if project.workspaces.update_workspace(element.name, job.workspace_dict):
+            if project.workspaces.update_workspace(element.name, workspace_dict):
                 try:
                     project.workspaces.save_config()
                 except BstError as e:
@@ -343,7 +351,7 @@ class Queue():
         self._scheduler.put_job_token(self.queue_type)
 
         # Notify frontend
-        self._scheduler.job_completed(self, job, success)
+        self._scheduler.job_completed(self, job, element, success)
 
         self._scheduler.sched()
 
@@ -353,3 +361,16 @@ class Queue():
         context = element._get_context()
         message = Message(element._get_unique_id(), message_type, brief, **kwargs)
         context.message(message)
+
+    def _element_log_path(self, element):
+        project = element._get_project()
+        context = element._get_context()
+
+        key = element._get_display_key()[1]
+        action = self.action_name.lower()
+        logfile = "{key}-{action}.{{pid}}.log".format(key=key, action=action)
+
+        directory = os.path.join(context.logdir, project.name, element.normal_name)
+
+        os.makedirs(directory, exist_ok=True)
+        return os.path.join(directory, logfile)
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 9cbfd49..a475722 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -298,9 +298,9 @@ class Scheduler():
     # Args:
     #    job (Job): The starting Job
     #
-    def job_starting(self, job):
+    def job_starting(self, job, element):
         if self._job_start_callback:
-            self._job_start_callback(job.element, job.action_name)
+            self._job_start_callback(element, job.action_name)
 
     # job_completed():
     #
@@ -311,9 +311,9 @@ class Scheduler():
     #    job (Job): The completed Job
     #    success (bool): Whether the Job completed with a success status
     #
-    def job_completed(self, queue, job, success):
+    def job_completed(self, queue, job, element, success):
         if self._job_complete_callback:
-            self._job_complete_callback(job.element, queue, job.action_name, success)
+            self._job_complete_callback(element, queue, job.action_name, success)
 
     #######################################################
     #                  Local Private Methods              #


[buildstream] 01/04: buildstream/_scheduler/*queue.py: Move queues to a subdirectory

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch 372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 8264e69142a2fef06bab0d48098a371996888505
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Apr 24 18:26:26 2018 +0100

    buildstream/_scheduler/*queue.py: Move queues to a subdirectory
---
 buildstream/_scheduler/__init__.py                | 12 ++++++------
 buildstream/_scheduler/queues/__init__.py         |  1 +
 buildstream/_scheduler/{ => queues}/buildqueue.py |  0
 buildstream/_scheduler/{ => queues}/fetchqueue.py |  2 +-
 buildstream/_scheduler/{ => queues}/pullqueue.py  |  0
 buildstream/_scheduler/{ => queues}/pushqueue.py  |  0
 buildstream/_scheduler/{ => queues}/queue.py      |  6 +++---
 buildstream/_scheduler/{ => queues}/trackqueue.py |  4 ++--
 buildstream/_scheduler/scheduler.py               |  2 +-
 9 files changed, 14 insertions(+), 13 deletions(-)

diff --git a/buildstream/_scheduler/__init__.py b/buildstream/_scheduler/__init__.py
index 14cdebf..714bc92 100644
--- a/buildstream/_scheduler/__init__.py
+++ b/buildstream/_scheduler/__init__.py
@@ -18,12 +18,12 @@
 #  Authors:
 #        Tristan Van Berkom <tr...@codethink.co.uk>
 
-from .queue import Queue, QueueStatus, QueueType
+from .queues import Queue, QueueStatus, QueueType
 
-from .fetchqueue import FetchQueue
-from .trackqueue import TrackQueue
-from .buildqueue import BuildQueue
-from .pushqueue import PushQueue
-from .pullqueue import PullQueue
+from .queues.fetchqueue import FetchQueue
+from .queues.trackqueue import TrackQueue
+from .queues.buildqueue import BuildQueue
+from .queues.pushqueue import PushQueue
+from .queues.pullqueue import PullQueue
 
 from .scheduler import Scheduler, SchedStatus
diff --git a/buildstream/_scheduler/queues/__init__.py b/buildstream/_scheduler/queues/__init__.py
new file mode 100644
index 0000000..b9acef1
--- /dev/null
+++ b/buildstream/_scheduler/queues/__init__.py
@@ -0,0 +1 @@
+from .queue import Queue, QueueStatus, QueueType
diff --git a/buildstream/_scheduler/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
similarity index 100%
rename from buildstream/_scheduler/buildqueue.py
rename to buildstream/_scheduler/queues/buildqueue.py
diff --git a/buildstream/_scheduler/fetchqueue.py b/buildstream/_scheduler/queues/fetchqueue.py
similarity index 98%
rename from buildstream/_scheduler/fetchqueue.py
rename to buildstream/_scheduler/queues/fetchqueue.py
index c2bceb2..d3b12af 100644
--- a/buildstream/_scheduler/fetchqueue.py
+++ b/buildstream/_scheduler/queues/fetchqueue.py
@@ -20,7 +20,7 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 
 # BuildStream toplevel imports
-from .. import Consistency
+from ... import Consistency
 
 # Local imports
 from . import Queue, QueueStatus, QueueType
diff --git a/buildstream/_scheduler/pullqueue.py b/buildstream/_scheduler/queues/pullqueue.py
similarity index 100%
rename from buildstream/_scheduler/pullqueue.py
rename to buildstream/_scheduler/queues/pullqueue.py
diff --git a/buildstream/_scheduler/pushqueue.py b/buildstream/_scheduler/queues/pushqueue.py
similarity index 100%
rename from buildstream/_scheduler/pushqueue.py
rename to buildstream/_scheduler/queues/pushqueue.py
diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queues/queue.py
similarity index 98%
rename from buildstream/_scheduler/queue.py
rename to buildstream/_scheduler/queues/queue.py
index 7c4ad69..e733776 100644
--- a/buildstream/_scheduler/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -25,11 +25,11 @@ from enum import Enum
 import traceback
 
 # Local imports
-from .job import Job
+from ..job import Job
 
 # BuildStream toplevel imports
-from .._exceptions import BstError, set_last_task_error
-from .._message import Message, MessageType
+from ..._exceptions import BstError, set_last_task_error
+from ..._message import Message, MessageType
 
 
 # Indicates the kind of activity
diff --git a/buildstream/_scheduler/trackqueue.py b/buildstream/_scheduler/queues/trackqueue.py
similarity index 97%
rename from buildstream/_scheduler/trackqueue.py
rename to buildstream/_scheduler/queues/trackqueue.py
index 2e7bc8b..a2a4e5e 100644
--- a/buildstream/_scheduler/trackqueue.py
+++ b/buildstream/_scheduler/queues/trackqueue.py
@@ -20,8 +20,8 @@
 #        Jürg Billeter <ju...@codethink.co.uk>
 
 # BuildStream toplevel imports
-from ..plugin import _plugin_lookup
-from .. import SourceError
+from ...plugin import _plugin_lookup
+from ... import SourceError
 
 # Local imports
 from . import Queue, QueueStatus, QueueType
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 25e1e67..9cbfd49 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -27,7 +27,7 @@ import datetime
 from contextlib import contextmanager
 
 # Local imports
-from .queue import QueueType
+from .queues import QueueType
 
 
 # A decent return code for Scheduler.run()


[buildstream] 03/04: buildstream/_scheduler/*: Add QueueRunner

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch 372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 8e59ca0b205b5f84e1e9a88cd169bd4168a53ded
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Fri Apr 20 15:35:25 2018 +0100

    buildstream/_scheduler/*: Add QueueRunner
---
 buildstream/_scheduler/queuerunner.py | 104 ++++++++++++++++++++++++++++++++++
 buildstream/_scheduler/scheduler.py   |  35 ++----------
 2 files changed, 109 insertions(+), 30 deletions(-)

diff --git a/buildstream/_scheduler/queuerunner.py b/buildstream/_scheduler/queuerunner.py
new file mode 100644
index 0000000..aac2e40
--- /dev/null
+++ b/buildstream/_scheduler/queuerunner.py
@@ -0,0 +1,104 @@
+#  Copyright (C) 2018 Codethink Limited
+#
+#  This program is free software; you can redistribute it and/or
+#  modify it under the terms of the GNU Lesser General Public
+#  License as published by the Free Software Foundation; either
+#  version 2 of the License, or (at your option) any later version.
+#
+#  This library is distributed in the hope that it will be useful,
+#  but WITHOUT ANY WARRANTY; without even the implied warranty of
+#  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+#  Lesser General Public License for more details.
+#
+#  You should have received a copy of the GNU Lesser General Public
+#  License along with this library. If not, see <http://www.gnu.org/licenses/>.
+#
+#  Author:
+#        Tristan Daniël Maat <tr...@codethink.co.uk>
+#
+from itertools import chain
+
+
+# QueueRunner()
+#
+# The queue runner manages queues and determines what jobs should be
+# run at any given point.
+#
+# `QueueRunner.schedule_jobs` will pull elements through queues as
+# appropriate, and return jobs for any "ready" elements to submit for
+# processing.
+#
+# Args:
+#     scheduler (Scheduler) - The scheduler to provide jobs for.
+#     queues ([Queue]) - The queues to manage.
+#
+class QueueRunner():
+    def __init__(self, scheduler, queues=None):
+        try:
+            queues = list(queues)
+        except TypeError:
+            queues = [queues] if queues is not None else []
+
+        self.queues = queues
+        self._scheduler = scheduler
+
+    # append()
+    #
+    # Append a queue to the list of queues.
+    #
+    # Args:
+    #     queue (Queue) - The queue to append
+    #
+    def append(self, queue):
+        self.queues.append(queue)
+
+    # extend()
+    #
+    # Extend the list of queues.
+    #
+    # Args:
+    #    queues (typing.Iterable[Queue]) - The queues to append
+    #
+    def extend(self, queues):
+        self.queues.extend(queues)
+
+    # schedule_jobs()
+    #
+    # Pull elements through the managed queues, and collect jobs to be
+    # executed by the scheduler
+    #
+    # Returns:
+    #    (typing.List[Job]) jobs for the scheduler to execute.
+    #
+    def schedule_jobs(self):
+
+        ready = []
+        process_queues = True
+
+        while self._scheduler._queue_jobs and process_queues:
+
+            # Pull elements forward through queues
+            elements = []
+            for queue in self.queues:
+                # Enqueue elements complete from the last queue
+                queue.enqueue(elements)
+
+                # Dequeue processed elements for the next queue
+                elements = list(queue.dequeue())
+
+            # Kickoff whatever processes can be processed at this time
+            #
+            # We start by queuing from the last queue first, because we want to
+            # give priority to queues later in the scheduling process in the case
+            # that multiple queues share the same token type.
+            #
+            # This avoids starvation situations where we dont move on to fetch
+            # tasks for elements which failed to pull, and thus need all the pulls
+            # to complete before ever starting a build
+            ready.extend(chain.from_iterable(queue.process_ready() for queue in reversed(self.queues)))
+
+            # process_ready() may have skipped jobs, adding them to the done_queue.
+            # Pull these skipped elements forward to the next queue and process them.
+            process_queues = any(q.dequeue_ready() for q in self.queues)
+
+        return ready
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index a475722..61cfc11 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -28,6 +28,7 @@ from contextlib import contextmanager
 
 # Local imports
 from .queues import QueueType
+from .queuerunner import QueueRunner
 
 
 # A decent return code for Scheduler.run()
@@ -82,6 +83,7 @@ class Scheduler():
         #
         # Private members
         #
+        self._runners = []
         self._interrupt_callback = interrupt_callback
         self._ticker_callback = ticker_callback
         self._job_start_callback = job_start_callback
@@ -115,6 +117,7 @@ class Scheduler():
     def run(self, queues):
 
         # Hold on to the queues to process
+        self._runners.append(QueueRunner(self, queues))
         self.queues = queues
 
         # Ensure that we have a fresh new event loop, in case we want
@@ -221,36 +224,8 @@ class Scheduler():
     # and process anything that is ready.
     #
     def sched(self):
-
-        process_queues = True
-
-        while self._queue_jobs and process_queues:
-
-            # Pull elements forward through queues
-            elements = []
-            for queue in self.queues:
-                # Enqueue elements complete from the last queue
-                queue.enqueue(elements)
-
-                # Dequeue processed elements for the next queue
-                elements = list(queue.dequeue())
-                elements = list(elements)
-
-            # Kickoff whatever processes can be processed at this time
-            #
-            # We start by queuing from the last queue first, because we want to
-            # give priority to queues later in the scheduling process in the case
-            # that multiple queues share the same token type.
-            #
-            # This avoids starvation situations where we dont move on to fetch
-            # tasks for elements which failed to pull, and thus need all the pulls
-            # to complete before ever starting a build
-            for queue in reversed(self.queues):
-                queue.process_ready()
-
-            # process_ready() may have skipped jobs, adding them to the done_queue.
-            # Pull these skipped elements forward to the next queue and process them.
-            process_queues = any(q.dequeue_ready() for q in self.queues)
+        for runner in self._runners:
+            runner.schedule_jobs()
 
         # If nothings ticking, time to bail out
         ticking = 0


[buildstream] 04/04: buildstream/_scheduler/*.py: Make job submission a queue job

Posted by no...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch 372-allow-queues-to-run-auxilliary-jobs-after-an-element-s-job-finishes
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 92a6e3464fdb29181556c2f12d0cc6cc99774ff4
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Thu Apr 26 15:48:29 2018 +0100

    buildstream/_scheduler/*.py: Make job submission a queue job
---
 buildstream/_scheduler/jobs/job.py     |  6 ++-
 buildstream/_scheduler/queues/queue.py | 23 ++++-----
 buildstream/_scheduler/scheduler.py    | 92 ++++++++++++++++------------------
 3 files changed, 58 insertions(+), 63 deletions(-)

diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 6728ae0..9335cf9 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -67,13 +67,14 @@ class Process(multiprocessing.Process):
 #
 class Job():
 
-    def __init__(self, scheduler, action_name, logfile, *, max_retries=0):
+    def __init__(self, scheduler, job_type, action_name, logfile, *, max_retries=0):
 
         #
         # Public members
         #
         self.action_name = action_name   # The action name for the Queue
-        self.child_data = None
+        self.child_data = None           # Data to be sent to the main process
+        self.job_type = job_type         # The type of the job
 
         #
         # Private members
@@ -540,6 +541,7 @@ class Job():
             return
 
         self._parent_complete(returncode == 0, self._result)
+        self._scheduler.job_completed(self)
 
     # _parent_process_envelope()
     #
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 648bebb..434ca87 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -78,7 +78,6 @@ class Queue():
         #
         # Public members
         #
-        self.active_jobs = []          # List of active ongoing Jobs, for scheduler observation
         self.failed_elements = []      # List of failed elements, for the frontend
         self.processed_elements = []   # List of processed elements, for the frontend
         self.skipped_elements = []     # List of skipped elements, for the frontend
@@ -224,6 +223,7 @@ class Queue():
     def process_ready(self):
         scheduler = self._scheduler
         unready = []
+        ready = []
 
         while self._wait_queue and scheduler.get_job_token(self.queue_type):
             element = self._wait_queue.popleft()
@@ -242,20 +242,24 @@ class Queue():
             logfile = self._element_log_path(element)
             self.prepare(element)
 
-            job = ElementJob(scheduler, self.action_name,
+            job = ElementJob(scheduler, self.queue_type,
+                             self.action_name,
                              logfile, element=element,
                              action_cb=self.process,
                              complete_cb=self._job_done,
                              max_retries=self._max_retries)
-            scheduler.job_starting(job, element)
+            ready.append(job)
 
-            job.spawn()
-            self.active_jobs.append(job)
+            # Notify the frontend
+            if self._scheduler._job_start_callback:
+                self._scheduler._job_start_callback(element, self.action_name)
 
         # These were not ready but were in the beginning, give em
         # first priority again next time around
         self._wait_queue.extendleft(unready)
 
+        return ready
+
     #####################################################
     #                 Private Methods                   #
     #####################################################
@@ -299,9 +303,6 @@ class Queue():
     #
     def _job_done(self, job, element, success, result):
 
-        # Remove from our jobs
-        self.active_jobs.remove(job)
-
         # Update workspaces in the main task before calling any queue implementation
         self._update_workspaces(element, job)
 
@@ -347,13 +348,11 @@ class Queue():
                 self.failed_elements.append(element)
 
         # Give the token for this job back to the scheduler
-        # immediately before invoking another round of scheduling
         self._scheduler.put_job_token(self.queue_type)
 
         # Notify frontend
-        self._scheduler.job_completed(self, job, element, success)
-
-        self._scheduler.sched()
+        if self._scheduler._job_complete_callback:
+            self._scheduler._job_complete_callback(element, self, job.action_name, success)
 
     # Convenience wrapper for Queue implementations to send
     # a message for the element they are processing
diff --git a/buildstream/_scheduler/scheduler.py b/buildstream/_scheduler/scheduler.py
index 61cfc11..8187c7a 100644
--- a/buildstream/_scheduler/scheduler.py
+++ b/buildstream/_scheduler/scheduler.py
@@ -71,8 +71,10 @@ class Scheduler():
         #
         # Public members
         #
-        self.queues = None          # Exposed for the frontend to print summaries
+        self.active_jobs = []       # Jobs currently being run in the scheduler
         self.context = context      # The Context object shared with Queues
+        self.queues = None
+        self.queue_runner = None    # The QueueRunner that delivers jobs to schedule
         self.terminated = False     # Whether the scheduler was asked to terminate or has terminated
         self.suspended = False      # Whether the scheduler is currently suspended
 
@@ -83,7 +85,6 @@ class Scheduler():
         #
         # Private members
         #
-        self._runners = []
         self._interrupt_callback = interrupt_callback
         self._ticker_callback = ticker_callback
         self._job_start_callback = job_start_callback
@@ -116,8 +117,8 @@ class Scheduler():
     #
     def run(self, queues):
 
+        self.queue_runner = QueueRunner(self, queues)
         # Hold on to the queues to process
-        self._runners.append(QueueRunner(self, queues))
         self.queues = queues
 
         # Ensure that we have a fresh new event loop, in case we want
@@ -224,17 +225,38 @@ class Scheduler():
     # and process anything that is ready.
     #
     def sched(self):
-        for runner in self._runners:
-            runner.schedule_jobs()
+        jobs = self.queue_runner.schedule_jobs()
+        self.run_jobs(jobs)
 
         # If nothings ticking, time to bail out
-        ticking = 0
-        for queue in self.queues:
-            ticking += len(queue.active_jobs)
-
-        if ticking == 0:
+        if not self.active_jobs:
             self.loop.stop()
 
+    # run_jobs():
+    #
+    # Execute jobs and track them.
+    #
+    # Args:
+    #    jobs (typing.Iterable[jobs]) - A set of jobs to run
+    #
+    def run_jobs(self, jobs):
+        for job in jobs:
+            job.spawn()
+            self.active_jobs.append(job)
+
+    # job_completed():
+    #
+    # Called when a Job completes
+    #
+    # Args:
+    #    queue (Queue): The Queue holding a complete job
+    #    job (Job): The completed Job
+    #    success (bool): Whether the Job completed with a success status
+    #
+    def job_completed(self, job):
+        self.active_jobs.remove(job)
+        self.sched()
+
     # get_job_token():
     #
     # Used by the Queue object to obtain a token for
@@ -266,30 +288,6 @@ class Scheduler():
     def put_job_token(self, queue_type):
         self._job_tokens[queue_type] += 1
 
-    # job_starting():
-    #
-    # Called by the Queue when starting a Job
-    #
-    # Args:
-    #    job (Job): The starting Job
-    #
-    def job_starting(self, job, element):
-        if self._job_start_callback:
-            self._job_start_callback(element, job.action_name)
-
-    # job_completed():
-    #
-    # Called by the Queue when a Job completes
-    #
-    # Args:
-    #    queue (Queue): The Queue holding a complete job
-    #    job (Job): The completed Job
-    #    success (bool): Whether the Job completed with a success status
-    #
-    def job_completed(self, queue, job, element, success):
-        if self._job_complete_callback:
-            self._job_complete_callback(element, queue, job.action_name, success)
-
     #######################################################
     #                  Local Private Methods              #
     #######################################################
@@ -302,9 +300,8 @@ class Scheduler():
         if not self.suspended:
             self._suspendtime = datetime.datetime.now()
             self.suspended = True
-            for queue in self.queues:
-                for job in queue.active_jobs:
-                    job.suspend()
+            for job in self.active_jobs:
+                job.suspend()
 
     # _resume_jobs()
     #
@@ -312,9 +309,8 @@ class Scheduler():
     #
     def _resume_jobs(self):
         if self.suspended:
-            for queue in self.queues:
-                for job in queue.active_jobs:
-                    job.resume()
+            for job in self.active_jobs:
+                job.resume()
             self.suspended = False
             self._starttime += (datetime.datetime.now() - self._suspendtime)
             self._suspendtime = None
@@ -377,17 +373,15 @@ class Scheduler():
         wait_limit = 20.0
 
         # First tell all jobs to terminate
-        for queue in self.queues:
-            for job in queue.active_jobs:
-                job.terminate()
+        for job in self.active_jobs:
+            job.terminate()
 
         # Now wait for them to really terminate
-        for queue in self.queues:
-            for job in queue.active_jobs:
-                elapsed = datetime.datetime.now() - wait_start
-                timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
-                if not job.terminate_wait(timeout):
-                    job.kill()
+        for job in self.active_jobs:
+            elapsed = datetime.datetime.now() - wait_start
+            timeout = max(wait_limit - elapsed.total_seconds(), 0.0)
+            if not job.terminate_wait(timeout):
+                job.kill()
 
         self.loop.stop()