You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:47:35 UTC

[buildstream] 01/01: Overhaul internal messaging API

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch Qinusty/message-helpers
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit a350b438e5d29eb16408228af10daf39e0989c59
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Jul 3 16:43:47 2018 +0100

    Overhaul internal messaging API
---
 buildstream/_artifactcache/artifactcache.py |  12 +---
 buildstream/_context.py                     | 106 +++++++++++++++++++++++++---
 buildstream/_frontend/app.py                |  22 +++---
 buildstream/_pipeline.py                    |  14 +---
 buildstream/_project.py                     |  14 ++--
 buildstream/_scheduler/jobs/elementjob.py   |  23 +++---
 buildstream/_scheduler/jobs/job.py          |  90 ++++++++++++-----------
 buildstream/_scheduler/queues/buildqueue.py |   8 +--
 buildstream/_scheduler/queues/queue.py      |  39 +++++-----
 buildstream/_stream.py                      |  33 ++++-----
 buildstream/plugin.py                       |  18 ++---
 11 files changed, 209 insertions(+), 170 deletions(-)

diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 8ea6c9d..956ed84 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -24,7 +24,6 @@ from collections.abc import Mapping
 
 from ..types import _KeyStrength
 from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
-from .._message import Message, MessageType
 from .. import utils
 from .. import _yaml
 
@@ -589,15 +588,6 @@ class ArtifactCache():
     #               Local Private Methods          #
     ################################################
 
-    # _message()
-    #
-    # Local message propagator
-    #
-    def _message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        self.context.message(
-            Message(None, message_type, message, **args))
-
     # _set_remotes():
     #
     # Set the list of remote caches. If project is None, the global list of
@@ -621,7 +611,7 @@ class ArtifactCache():
     #
     def _initialize_remotes(self):
         def remote_failed(url, error):
-            self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error))
+            self.context.warn("Failed to fetch remote refs from {}: {}".format(url, error))
 
         with self.context.timed_activity("Initializing remote caches", silent_nested=True):
             self.initialize_remotes(on_failure=remote_failed)
diff --git a/buildstream/_context.py b/buildstream/_context.py
index e3c290b..5bfd897 100644
--- a/buildstream/_context.py
+++ b/buildstream/_context.py
@@ -27,6 +27,7 @@ from . import _cachekey
 from . import _signals
 from . import _site
 from . import _yaml
+from .plugin import Plugin
 from ._exceptions import LoadError, LoadErrorReason, BstError
 from ._message import Message, MessageType
 from ._profile import Topics, profile_start, profile_end
@@ -326,7 +327,7 @@ class Context():
     # the context.
     #
     # The message handler should have the same signature as
-    # the message() method
+    # the _send_message() method
     def set_message_handler(self, handler):
         self._message_handler = handler
 
@@ -341,15 +342,19 @@ class Context():
                 return True
         return False
 
-    # message():
+    # _send_message():
     #
-    # Proxies a message back to the caller, this is the central
+    # Proxies a message back through the message handler, this is the central
     # point through which all messages pass.
     #
     # Args:
     #    message: A Message object
     #
-    def message(self, message):
+    def _send_message(self, message):
+        # Debug messages should only be displayed when they are
+        # configured to be
+        if not self.log_debug and message.message_type == MessageType.DEBUG:
+            return
 
         # Tag message only once
         if message.depth is None:
@@ -365,6 +370,86 @@ class Context():
 
         self._message_handler(message, context=self)
 
+    # message():
+    #
+    # The global message API. Any message-sending functions should go
+    # through here. This will call `_send_message` to deliver the
+    # final message.
+    #
+    # Args:
+    #     text (str): The text of the message.
+    #
+    # Kwargs:
+    #     msg_type (MessageType): The type of the message (required).
+    #     plugin (Plugin|str|None): The id of the plugin
+    #                               (i.e. Element, Source subclass
+    #                               instance) sending the message. If
+    #                               a plugin is given, this will be
+    #                               determined automatically, if
+    #                               omitted the message will be sent
+    #                               without a plugin context.
+    #
+    #    For other kwargs, see `Message`.
+    #
+    def message(self, text, *, plugin=None, msg_type=None, **kwargs):
+        assert msg_type is not None
+
+        if isinstance(plugin, Plugin):
+            plugin_id = plugin._get_unique_id()
+        else:
+            plugin_id = plugin
+
+        self._send_message(Message(plugin_id, msg_type, str(text), **kwargs))
+
+    # skipped():
+    #
+    # Produce and send a skipped message through the context.
+    #
+    def skipped(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.SKIPPED, **kwargs)
+
+    # debug():
+    #
+    # Produce and send a debug message through the context.
+    #
+    def debug(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.DEBUG, **kwargs)
+
+    # status():
+    #
+    # Produce and send a status message through the context.
+    #
+    def status(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.STATUS, **kwargs)
+
+    # info():
+    #
+    # Produce and send a info message through the context.
+    #
+    def info(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.INFO, **kwargs)
+
+    # warn():
+    #
+    # Produce and send a warning message through the context.
+    #
+    def warn(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.WARN, **kwargs)
+
+    # error():
+    #
+    # Produce and send a error message through the context.
+    #
+    def error(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.ERROR, **kwargs)
+
+    # log():
+    #
+    # Produce and send a log message through the context.
+    #
+    def log(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.LOG, **kwargs)
+
     # silence()
     #
     # A context manager to silence messages, this behaves in
@@ -409,8 +494,8 @@ class Context():
         with _signals.suspendable(stop_time, resume_time):
             try:
                 # Push activity depth for status messages
-                message = Message(unique_id, MessageType.START, activity_name, detail=detail)
-                self.message(message)
+                self.message(activity_name, detail=detail, plugin=unique_id,
+                             msg_type=MessageType.START)
                 self._push_message_depth(silent_nested)
                 yield
 
@@ -418,15 +503,16 @@ class Context():
                 # Note the failure in status messages and reraise, the scheduler
                 # expects an error when there is an error.
                 elapsed = datetime.datetime.now() - starttime
-                message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
                 self._pop_message_depth()
-                self.message(message)
+                self.message(activity_name, detail=detail, elapsed=elapsed, plugin=unique_id,
+                             msg_type=MessageType.FAIL)
                 raise
 
             elapsed = datetime.datetime.now() - starttime
-            message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
             self._pop_message_depth()
-            self.message(message)
+            self.message(activity_name, detail=detail,
+                         elapsed=elapsed, plugin=unique_id,
+                         msg_type=MessageType.SUCCESS)
 
     # recorded_messages()
     #
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 87db807..85e7edb 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -34,7 +34,7 @@ from .._context import Context
 from .._platform import Platform
 from .._project import Project
 from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
-from .._message import Message, MessageType, unconditional_messages
+from .._message import MessageType, unconditional_messages
 from .._stream import Stream
 from .._versions import BST_FORMAT_VERSION
 from .. import _yaml
@@ -250,7 +250,7 @@ class App():
 
         # Mark the beginning of the session
         if session_name:
-            self._message(MessageType.START, session_name)
+            self.context.message(session_name, msg_type=MessageType.START)
 
         # Run the body of the session here, once everything is loaded
         try:
@@ -262,9 +262,9 @@ class App():
                 elapsed = self.stream.elapsed_time
 
                 if isinstance(e, StreamError) and e.terminated:  # pylint: disable=no-member
-                    self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
+                    self.context.warn(session_name + ' Terminated', elapsed=elapsed)
                 else:
-                    self._message(MessageType.FAIL, session_name, elapsed=elapsed)
+                    self.context.message(session_name, elapsed=elapsed, msg_type=MessageType.FAIL)
 
                     # Notify session failure
                     self._notify("{} failed".format(session_name), "{}".format(e))
@@ -282,7 +282,9 @@ class App():
         else:
             # No exceptions occurred, print session time and summary
             if session_name:
-                self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+                self.context.message(session_name,
+                                     elapsed=self.stream.elapsed_time,
+                                     msg_type=MessageType.SUCCESS)
                 if self._started:
                     self._print_summary()
 
@@ -428,21 +430,13 @@ class App():
         if self.interactive:
             self.notify(title, text)
 
-    # Local message propagator
-    #
-    def _message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        self.context.message(
-            Message(None, message_type, message, **args))
-
     # Exception handler
     #
     def _global_exception_handler(self, etype, value, tb):
 
         # Print the regular BUG message
         formatted = "".join(traceback.format_exception(etype, value, tb))
-        self._message(MessageType.BUG, str(value),
-                      detail=formatted)
+        self.context.message(value, detail=formatted, msg_type=MessageType.BUG)
 
         # If the scheduler has started, try to terminate all jobs gracefully,
         # otherwise exit immediately.
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 1f75b2e..c57c54f 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -24,7 +24,6 @@ import itertools
 from operator import itemgetter
 
 from ._exceptions import PipelineError
-from ._message import Message, MessageType
 from ._profile import Topics, profile_start, profile_end
 from . import Scope, Consistency
 from ._project import ProjectRefStorage
@@ -201,8 +200,8 @@ class Pipeline():
             for t in targets:
                 new_elm = t._get_source_element()
                 if new_elm != t and not silent:
-                    self._message(MessageType.INFO, "Element '{}' redirected to '{}'"
-                                  .format(t.name, new_elm.name))
+                    self._context.info("Element '{}' redirected to '{}'"
+                                       .format(t.name, new_elm.name))
                 if new_elm not in elements:
                     elements.append(new_elm)
         elif mode == PipelineSelection.PLAN:
@@ -433,15 +432,6 @@ class Pipeline():
 
                 raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
 
-    # _message()
-    #
-    # Local message propagator
-    #
-    def _message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        self._context.message(
-            Message(None, message_type, message, **args))
-
 
 # _Planner()
 #
diff --git a/buildstream/_project.py b/buildstream/_project.py
index 83aa1f4..7126cf7 100644
--- a/buildstream/_project.py
+++ b/buildstream/_project.py
@@ -37,7 +37,6 @@ from ._projectrefs import ProjectRefs, ProjectRefStorage
 from ._versions import BST_FORMAT_VERSION
 from ._loader import Loader
 from .element import Element
-from ._message import Message, MessageType
 from ._includes import Includes
 from ._platform import Platform
 
@@ -337,8 +336,7 @@ class Project():
                 for source, ref in redundant_refs
             ]
             detail += "\n".join(lines)
-            self._context.message(
-                Message(None, MessageType.WARN, "Ignoring redundant source references", detail=detail))
+            self._context.warn("Ignoring redundant source references", detail=detail)
 
         return elements
 
@@ -514,13 +512,9 @@ class Project():
 
         # Deprecation check
         if fail_on_overlap is not None:
-            self._context.message(
-                Message(
-                    None,
-                    MessageType.WARN,
-                    "Use of fail-on-overlap within project.conf " +
-                    "is deprecated. Consider using fatal-warnings instead."
-                )
+            self._context.warn(
+                "Use of fail-on-overlap within project.conf " +
+                "is deprecated. Consider using fatal-warnings instead."
             )
 
         # Load project.refs if it exists, this may be ignored.
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
index 8ce5c06..864e458 100644
--- a/buildstream/_scheduler/jobs/elementjob.py
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -18,8 +18,6 @@
 #
 from ruamel import yaml
 
-from ..._message import Message, MessageType
-
 from .job import Job
 
 
@@ -86,9 +84,8 @@ class ElementJob(Job):
         # This should probably be omitted for non-build tasks but it's harmless here
         elt_env = self._element.get_environment()
         env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
-        self.message(MessageType.LOG,
-                     "Build environment for element {}".format(self._element.name),
-                     detail=env_dump)
+        self._log("Build environment for element {}".format(self._element.name),
+                  detail=env_dump, plugin=self.element, scheduler=True)
 
         # Run the action
         return self._action_cb(self._element)
@@ -96,15 +93,6 @@ class ElementJob(Job):
     def parent_complete(self, success, result):
         self._complete_cb(self, self._element, success, self._result)
 
-    def message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        args['scheduler'] = True
-        self._scheduler.context.message(
-            Message(self._element._get_unique_id(),
-                    message_type,
-                    message,
-                    **args))
-
     def child_process_data(self):
         data = {}
 
@@ -113,3 +101,10 @@ class ElementJob(Job):
             data['workspace'] = workspace.to_dict()
 
         return data
+
+    # _fail()
+    #
+    # Override _fail to set scheduler kwarg to true.
+    #
+    def _fail(self, text, **kwargs):
+        super()._fail(text, scheduler=True, **kwargs)
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 60ae0d0..ce5fa45 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -32,7 +32,7 @@ import psutil
 
 # BuildStream toplevel imports
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import MessageType, unconditional_messages
 from ... import _signals, utils
 
 # Return code values shutdown of job handling child processes
@@ -110,6 +110,7 @@ class Job():
         # Private members
         #
         self._scheduler = scheduler            # The scheduler
+        self._context = scheduler.context      # The context, used primarily for UI messaging.
         self._queue = None                     # A message passing queue
         self._process = None                   # The Process object
         self._watcher = None                   # Child process watcher
@@ -184,7 +185,7 @@ class Job():
         # First resume the job if it's suspended
         self.resume(silent=True)
 
-        self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
+        self._status("{} terminating".format(self.action_name))
 
         # Make sure there is no garbage on the queue
         self._parent_stop_listening()
@@ -217,8 +218,8 @@ class Job():
     def kill(self):
 
         # Force kill
-        self.message(MessageType.WARN,
-                     "{} did not terminate gracefully, killing".format(self.action_name))
+        self._warn("{} did not terminate gracefully, killing"
+                   .format(self.action_name))
 
         try:
             utils._kill_process_tree(self._process.pid)
@@ -233,8 +234,7 @@ class Job():
     #
     def suspend(self):
         if not self._suspended:
-            self.message(MessageType.STATUS,
-                         "{} suspending".format(self.action_name))
+            self._status("{} suspending".format(self.action_name))
 
             try:
                 # Use SIGTSTP so that child processes may handle and propagate
@@ -258,8 +258,7 @@ class Job():
     def resume(self, silent=False):
         if self._suspended:
             if not silent and not self._scheduler.terminated:
-                self.message(MessageType.STATUS,
-                             "{} resuming".format(self.action_name))
+                self._status("{} resuming".format(self.action_name))
 
             os.kill(self._process.pid, signal.SIGCONT)
             self._suspended = False
@@ -324,21 +323,6 @@ class Job():
         raise ImplError("Job '{kind}' does not implement child_process()"
                         .format(kind=type(self).__name__))
 
-    # message():
-    #
-    # Logs a message, this will be logged in the task's logfile and
-    # conditionally also be sent to the frontend.
-    #
-    # Args:
-    #    message_type (MessageType): The type of message to send
-    #    message (str): The message
-    #    kwargs: Remaining Message() constructor arguments
-    #
-    def message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        args['scheduler'] = True
-        self._scheduler.context.message(Message(None, message_type, message, **args))
-
     # child_process_data()
     #
     # Abstract method to retrieve additional data that should be
@@ -365,6 +349,32 @@ class Job():
     #
     #######################################################
 
+    def _debug(self, text, **kwargs):
+        self._context.debug(text, task_id=self._task_id, **kwargs)
+
+    def _status(self, text, **kwargs):
+        self._context.status(text, task_id=self._task_id, **kwargs)
+
+    def _info(self, text, **kwargs):
+        self._context.info(text, task_id=self._task_id, **kwargs)
+
+    def _warn(self, text, **kwargs):
+        self._context.warn(text, task_id=self._task_id, **kwargs)
+
+    def _error(self, text, **kwargs):
+        self._context.error(text, task_id=self._task_id, **kwargs)
+
+    def _log(self, text, **kwargs):
+        self._context.log(text, task_id=self._task_id, **kwargs)
+
+    # _fail()
+    #
+    # Only exists for sub classes to override and add kwargs to.
+    #
+    def _fail(self, text, **kwargs):
+        self._context.message(text, task_id=self._task_id,
+                              msg_type=MessageType.FAIL, **kwargs)
+
     # _child_action()
     #
     # Perform the action in the child process, this calls the action_cb.
@@ -391,7 +401,7 @@ class Job():
         # Set the global message handler in this child
         # process to forward messages to the parent process
         self._queue = queue
-        self._scheduler.context.set_message_handler(self._child_message_handler)
+        self._context.set_message_handler(self._child_message_handler)
 
         starttime = datetime.datetime.now()
         stopped_time = None
@@ -408,17 +418,17 @@ class Job():
         # Time, log and and run the action function
         #
         with _signals.suspendable(stop_time, resume_time), \
-            self._scheduler.context.recorded_messages(self._logfile) as filename:
+            self._context.recorded_messages(self._logfile) as filename:
 
-            self.message(MessageType.START, self.action_name, logfile=filename)
+            self._context.message(self.action_name, logfile=filename,
+                                  msg_type=MessageType.START, task_id=self._task_id)
 
             try:
                 # Try the task action
                 result = self.child_process()  # pylint: disable=assignment-from-no-return
             except SkipJob as e:
                 elapsed = datetime.datetime.now() - starttime
-                self.message(MessageType.SKIPPED, str(e),
-                             elapsed=elapsed, logfile=filename)
+                self._context.skipped(e, elapsed=elapsed, logfile=filename)
 
                 # Alert parent of skip by return code
                 self._child_shutdown(RC_SKIPPED)
@@ -427,13 +437,11 @@ class Job():
                 self._retry_flag = e.temporary
 
                 if self._retry_flag and (self._tries <= self._max_retries):
-                    self.message(MessageType.FAIL,
-                                 "Try #{} failed, retrying".format(self._tries),
-                                 elapsed=elapsed, logfile=filename)
+                    self._fail("Try #{} failed, retrying".format(self._tries),
+                               elapsed=elapsed, logfile=filename)
                 else:
-                    self.message(MessageType.FAIL, str(e),
-                                 elapsed=elapsed, detail=e.detail,
-                                 logfile=filename, sandbox=e.sandbox)
+                    self._fail(e, elapsed=elapsed, detail=e.detail,
+                               logfile=filename, sandbox=e.sandbox)
 
                 self._queue.put(Envelope('child_data', self.child_process_data()))
 
@@ -453,9 +461,9 @@ class Job():
                 elapsed = datetime.datetime.now() - starttime
                 detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
 
-                self.message(MessageType.BUG, self.action_name,
-                             elapsed=elapsed, detail=detail,
-                             logfile=filename)
+                self._context.message(self.action_name, elapsed=elapsed,
+                                      detail=detail, logfile=filename,
+                                      task_id=self._task_id, msg_type=MessageType.BUG)
                 # Unhandled exceptions should permenantly fail
                 self._child_shutdown(RC_PERM_FAIL)
 
@@ -465,8 +473,10 @@ class Job():
                 self._child_send_result(result)
 
                 elapsed = datetime.datetime.now() - starttime
-                self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
-                             logfile=filename)
+                self._context.message(self.action_name,
+                                      elapsed=elapsed, logfile=filename,
+                                      msg_type=MessageType.SUCCESS,
+                                      task_id=self._task_id)
 
                 # Shutdown needs to stay outside of the above context manager,
                 # make sure we dont try to handle SIGTERM while the process
@@ -603,7 +613,7 @@ class Job():
         if envelope._message_type == 'message':
             # Propagate received messages from children
             # back through the context.
-            self._scheduler.context.message(envelope._message)
+            self._context._send_message(envelope._message)
         elif envelope._message_type == 'error':
             # For regression tests only, save the last error domain / reason
             # reported from a child task in the main process, this global state
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 984a545..c02e3e5 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -51,10 +51,10 @@ class BuildQueue(Queue):
             self._tried.add(element)
             _, description, detail = element._get_build_result()
             logfile = element._get_build_log()
-            self._message(element, MessageType.FAIL, description,
-                          detail=detail, action_name=self.action_name,
-                          elapsed=timedelta(seconds=0),
-                          logfile=logfile)
+            self._context.message(description, msg_type=MessageType.FAIL, plugin=element,
+                                  detail=detail, action_name=self.action_name,
+                                  elapsed=timedelta(seconds=0),
+                                  logfile=logfile)
             job = ElementJob(self._scheduler, self.action_name,
                              logfile, element=element, queue=self,
                              resources=self.resources,
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 909cebb..df51f85 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -30,7 +30,7 @@ from ..resources import ResourceType
 
 # BuildStream toplevel imports
 from ..._exceptions import BstError, set_last_task_error
-from ..._message import Message, MessageType
+from ..._message import MessageType
 
 
 # Queue status for a given element
@@ -72,6 +72,7 @@ class Queue():
         # Private members
         #
         self._scheduler = scheduler
+        self._context = scheduler.context
         self._wait_queue = deque()
         self._done_queue = deque()
         self._max_retries = 0
@@ -270,17 +271,19 @@ class Queue():
         # Handle any workspace modifications now
         #
         if workspace_dict:
-            context = element._get_context()
-            workspaces = context.get_workspaces()
+            workspaces = self._context.get_workspaces()
             if workspaces.update_workspace(element._get_full_name(), workspace_dict):
                 try:
                     workspaces.save_config()
                 except BstError as e:
-                    self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
-                except Exception as e:   # pylint: disable=broad-except
-                    self._message(element, MessageType.BUG,
-                                  "Unhandled exception while saving workspaces",
-                                  detail=traceback.format_exc())
+                    self._context.error("Error saving workspaces",
+                                        detail=str(e),
+                                        plugin=element)
+                except Exception as e:  # pylint: disable=broad-except
+                    self._context.message("Unhandled exception while saving workspaces",
+                                          msg_type=MessageType.BUG,
+                                          detail=traceback.format_exc(),
+                                          plugin=element)
 
     # _job_done()
     #
@@ -304,10 +307,10 @@ class Queue():
         try:
             self.done(job, element, result, success)
         except BstError as e:
-
             # Report error and mark as failed
             #
-            self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
+            self._context.error("Post processing error",
+                                plugin=element, detail=traceback.format_exc())
             self.failed_elements.append(element)
 
             # Treat this as a task error as it's related to a task
@@ -317,13 +320,12 @@ class Queue():
             #
             set_last_task_error(e.domain, e.reason)
 
-        except Exception as e:   # pylint: disable=broad-except
-
+        except Exception:   # pylint: disable=broad-except
             # Report unhandled exceptions and mark as failed
             #
-            self._message(element, MessageType.BUG,
-                          "Unhandled exception in post processing",
-                          detail=traceback.format_exc())
+            self._context.message("Unhandled exception in post processing",
+                                  plugin=element, msg_type=MessageType.BUG,
+                                  detail=traceback.format_exc())
             self.failed_elements.append(element)
         else:
             #
@@ -343,13 +345,6 @@ class Queue():
             else:
                 self.failed_elements.append(element)
 
-    # Convenience wrapper for Queue implementations to send
-    # a message for the element they are processing
-    def _message(self, element, message_type, brief, **kwargs):
-        context = element._get_context()
-        message = Message(element._get_unique_id(), message_type, brief, **kwargs)
-        context.message(message)
-
     def _element_log_path(self, element):
         project = element._get_project()
         key = element._get_display_key()[1]
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 6e2e8b2..67d0766 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -25,11 +25,11 @@ import stat
 import shlex
 import shutil
 import tarfile
+import traceback
 from contextlib import contextmanager
 from tempfile import TemporaryDirectory
 
 from ._exceptions import StreamError, ImplError, BstError, set_last_task_error
-from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
 from ._pipeline import Pipeline, PipelineSelection
 from . import utils, _yaml, _site
@@ -517,7 +517,7 @@ class Stream():
                 target._open_workspace()
 
         workspaces.save_config()
-        self._message(MessageType.INFO, "Saved workspace configuration")
+        self._context.info("Saved workspace configuration")
 
     # workspace_close
     #
@@ -544,7 +544,7 @@ class Stream():
         # Delete the workspace and save the configuration
         workspaces.delete_workspace(element_name)
         workspaces.save_config()
-        self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
+        self._context.info("Closed workspace for {}".format(element_name))
 
     # workspace_reset
     #
@@ -585,8 +585,8 @@ class Stream():
             workspace_path = workspace.get_absolute_path()
             if soft:
                 workspace.prepared = False
-                self._message(MessageType.INFO, "Reset workspace state for {} at: {}"
-                              .format(element.name, workspace_path))
+                self._context.info("Reset workspace state for {} at: {}"
+                                   .format(element.name, workspace.path))
                 continue
 
             with element.timed_activity("Removing workspace directory {}"
@@ -603,9 +603,8 @@ class Stream():
             with element.timed_activity("Staging sources to {}".format(workspace_path)):
                 element._open_workspace()
 
-            self._message(MessageType.INFO,
-                          "Reset workspace for {} at: {}".format(element.name,
-                                                                 workspace_path))
+            self._context.info("Reset workspace for {} at: {}"
+                               .format(element.name, workspace._path))
 
         workspaces.save_config()
 
@@ -681,7 +680,7 @@ class Stream():
         # source-bundle only supports one target
         target = self.targets[0]
 
-        self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
+        self._context.info("Bundling sources for target {}".format(target.name))
 
         # Find the correct filename for the compression algorithm
         tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -961,15 +960,6 @@ class Stream():
 
         return selected, track_selected
 
-    # _message()
-    #
-    # Local message propagator
-    #
-    def _message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        self._context.message(
-            Message(None, message_type, message, **args))
-
     # _add_queue()
     #
     # Adds a queue to the stream
@@ -1020,10 +1010,11 @@ class Stream():
             for element in self.total_elements:
                 element._update_state()
         except BstError as e:
-            self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
+            self._context.error("Error resolving final state", detail=e)
             set_last_task_error(e.domain, e.reason)
-        except Exception as e:   # pylint: disable=broad-except
-            self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
+        except Exception as e:  # pylint: disable=broad-except
+            self._context.message("Unhandled exception while resolving final state",
+                                  detail=traceback.format_exc())
 
         if status == SchedStatus.ERROR:
             raise StreamError()
diff --git a/buildstream/plugin.py b/buildstream/plugin.py
index 1b021d4..aea135e 100644
--- a/buildstream/plugin.py
+++ b/buildstream/plugin.py
@@ -117,7 +117,6 @@ from weakref import WeakValueDictionary
 from . import _yaml
 from . import utils
 from ._exceptions import PluginError, ImplError
-from ._message import Message, MessageType
 
 
 class Plugin():
@@ -464,8 +463,7 @@ class Plugin():
            brief (str): The brief message
            detail (str): An optional detailed message, can be multiline output
         """
-        if self.__context.log_debug:
-            self.__message(MessageType.DEBUG, brief, detail=detail)
+        self.__context.debug(brief, detail=detail, plugin=self)
 
     def status(self, brief, *, detail=None):
         """Print a status message
@@ -474,9 +472,9 @@ class Plugin():
            brief (str): The brief message
            detail (str): An optional detailed message, can be multiline output
 
-        Note: Status messages tell about what a plugin is currently doing
+        Note: Status messages tell the user what a plugin is currently doing
         """
-        self.__message(MessageType.STATUS, brief, detail=detail)
+        self.__context.status(brief, detail=detail, plugin=self)
 
     def info(self, brief, *, detail=None):
         """Print an informative message
@@ -488,7 +486,7 @@ class Plugin():
         Note: Informative messages tell the user something they might want
               to know, like if refreshing an element caused it to change.
         """
-        self.__message(MessageType.INFO, brief, detail=detail)
+        self.__context.info(brief, detail=detail, plugin=self)
 
     def warn(self, brief, *, detail=None, warning_token=None):
         """Print a warning message, checks warning_token against project configuration
@@ -512,7 +510,7 @@ class Plugin():
                 detail = detail if detail else ""
                 raise PluginError(message="{}\n{}".format(brief, detail), reason=warning_token)
 
-        self.__message(MessageType.WARN, brief=brief, detail=detail)
+        self.__context.warn(brief, detail=detail, plugin=self)
 
     def log(self, brief, *, detail=None):
         """Log a message into the plugin's log file
@@ -524,7 +522,7 @@ class Plugin():
            brief (str): The brief message
            detail (str): An optional detailed message, can be multiline output
         """
-        self.__message(MessageType.LOG, brief, detail=detail)
+        self.__context.log(brief, detail=detail, plugin=self)
 
     @contextmanager
     def timed_activity(self, activity_name, *, detail=None, silent_nested=False):
@@ -746,10 +744,6 @@ class Plugin():
 
         return (exit_code, output)
 
-    def __message(self, message_type, brief, **kwargs):
-        message = Message(self.__unique_id, message_type, brief, **kwargs)
-        self.__context.message(message)
-
     def __note_command(self, output, *popenargs, **kwargs):
         workdir = kwargs.get('cwd', os.getcwd())
         command = " ".join(popenargs[0])