You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 07:38:57 UTC

[buildstream] branch Qinusty/message-helpers created (now a350b43)

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a change to branch Qinusty/message-helpers
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


      at a350b43  Overhaul internal messaging API

This branch includes the following new commits:

     new a350b43  Overhaul internal messaging API

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[buildstream] 01/01: Overhaul internal messaging API

Posted by tv...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch Qinusty/message-helpers
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit a350b438e5d29eb16408228af10daf39e0989c59
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Jul 3 16:43:47 2018 +0100

    Overhaul internal messaging API
---
 buildstream/_artifactcache/artifactcache.py |  12 +---
 buildstream/_context.py                     | 106 +++++++++++++++++++++++++---
 buildstream/_frontend/app.py                |  22 +++---
 buildstream/_pipeline.py                    |  14 +---
 buildstream/_project.py                     |  14 ++--
 buildstream/_scheduler/jobs/elementjob.py   |  23 +++---
 buildstream/_scheduler/jobs/job.py          |  90 ++++++++++++-----------
 buildstream/_scheduler/queues/buildqueue.py |   8 +--
 buildstream/_scheduler/queues/queue.py      |  39 +++++-----
 buildstream/_stream.py                      |  33 ++++-----
 buildstream/plugin.py                       |  18 ++---
 11 files changed, 209 insertions(+), 170 deletions(-)

diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 8ea6c9d..956ed84 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -24,7 +24,6 @@ from collections.abc import Mapping
 
 from ..types import _KeyStrength
 from .._exceptions import ArtifactError, ImplError, LoadError, LoadErrorReason
-from .._message import Message, MessageType
 from .. import utils
 from .. import _yaml
 
@@ -589,15 +588,6 @@ class ArtifactCache():
     #               Local Private Methods          #
     ################################################
 
-    # _message()
-    #
-    # Local message propagator
-    #
-    def _message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        self.context.message(
-            Message(None, message_type, message, **args))
-
     # _set_remotes():
     #
     # Set the list of remote caches. If project is None, the global list of
@@ -621,7 +611,7 @@ class ArtifactCache():
     #
     def _initialize_remotes(self):
         def remote_failed(url, error):
-            self._message(MessageType.WARN, "Failed to initialize remote {}: {}".format(url, error))
+            self.context.warn("Failed to fetch remote refs from {}: {}".format(url, error))
 
         with self.context.timed_activity("Initializing remote caches", silent_nested=True):
             self.initialize_remotes(on_failure=remote_failed)
diff --git a/buildstream/_context.py b/buildstream/_context.py
index e3c290b..5bfd897 100644
--- a/buildstream/_context.py
+++ b/buildstream/_context.py
@@ -27,6 +27,7 @@ from . import _cachekey
 from . import _signals
 from . import _site
 from . import _yaml
+from .plugin import Plugin
 from ._exceptions import LoadError, LoadErrorReason, BstError
 from ._message import Message, MessageType
 from ._profile import Topics, profile_start, profile_end
@@ -326,7 +327,7 @@ class Context():
     # the context.
     #
     # The message handler should have the same signature as
-    # the message() method
+    # the _send_message() method
     def set_message_handler(self, handler):
         self._message_handler = handler
 
@@ -341,15 +342,19 @@ class Context():
                 return True
         return False
 
-    # message():
+    # _send_message():
     #
-    # Proxies a message back to the caller, this is the central
+    # Proxies a message back through the message handler, this is the central
     # point through which all messages pass.
     #
     # Args:
     #    message: A Message object
     #
-    def message(self, message):
+    def _send_message(self, message):
+        # Debug messages should only be displayed when they are
+        # configured to be
+        if not self.log_debug and message.message_type == MessageType.DEBUG:
+            return
 
         # Tag message only once
         if message.depth is None:
@@ -365,6 +370,86 @@ class Context():
 
         self._message_handler(message, context=self)
 
+    # message():
+    #
+    # The global message API. Any message-sending functions should go
+    # through here. This will call `_send_message` to deliver the
+    # final message.
+    #
+    # Args:
+    #     text (str): The text of the message.
+    #
+    # Kwargs:
+    #     msg_type (MessageType): The type of the message (required).
+    #     plugin (Plugin|str|None): The id of the plugin
+    #                               (i.e. Element, Source subclass
+    #                               instance) sending the message. If
+    #                               a plugin is given, this will be
+    #                               determined automatically, if
+    #                               omitted the message will be sent
+    #                               without a plugin context.
+    #
+    #    For other kwargs, see `Message`.
+    #
+    def message(self, text, *, plugin=None, msg_type=None, **kwargs):
+        assert msg_type is not None
+
+        if isinstance(plugin, Plugin):
+            plugin_id = plugin._get_unique_id()
+        else:
+            plugin_id = plugin
+
+        self._send_message(Message(plugin_id, msg_type, str(text), **kwargs))
+
+    # skipped():
+    #
+    # Produce and send a skipped message through the context.
+    #
+    def skipped(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.SKIPPED, **kwargs)
+
+    # debug():
+    #
+    # Produce and send a debug message through the context.
+    #
+    def debug(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.DEBUG, **kwargs)
+
+    # status():
+    #
+    # Produce and send a status message through the context.
+    #
+    def status(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.STATUS, **kwargs)
+
+    # info():
+    #
+    # Produce and send a info message through the context.
+    #
+    def info(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.INFO, **kwargs)
+
+    # warn():
+    #
+    # Produce and send a warning message through the context.
+    #
+    def warn(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.WARN, **kwargs)
+
+    # error():
+    #
+    # Produce and send a error message through the context.
+    #
+    def error(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.ERROR, **kwargs)
+
+    # log():
+    #
+    # Produce and send a log message through the context.
+    #
+    def log(self, text, **kwargs):
+        self.message(text, msg_type=MessageType.LOG, **kwargs)
+
     # silence()
     #
     # A context manager to silence messages, this behaves in
@@ -409,8 +494,8 @@ class Context():
         with _signals.suspendable(stop_time, resume_time):
             try:
                 # Push activity depth for status messages
-                message = Message(unique_id, MessageType.START, activity_name, detail=detail)
-                self.message(message)
+                self.message(activity_name, detail=detail, plugin=unique_id,
+                             msg_type=MessageType.START)
                 self._push_message_depth(silent_nested)
                 yield
 
@@ -418,15 +503,16 @@ class Context():
                 # Note the failure in status messages and reraise, the scheduler
                 # expects an error when there is an error.
                 elapsed = datetime.datetime.now() - starttime
-                message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
                 self._pop_message_depth()
-                self.message(message)
+                self.message(activity_name, detail=detail, elapsed=elapsed, plugin=unique_id,
+                             msg_type=MessageType.FAIL)
                 raise
 
             elapsed = datetime.datetime.now() - starttime
-            message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
             self._pop_message_depth()
-            self.message(message)
+            self.message(activity_name, detail=detail,
+                         elapsed=elapsed, plugin=unique_id,
+                         msg_type=MessageType.SUCCESS)
 
     # recorded_messages()
     #
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 87db807..85e7edb 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -34,7 +34,7 @@ from .._context import Context
 from .._platform import Platform
 from .._project import Project
 from .._exceptions import BstError, StreamError, LoadError, LoadErrorReason, AppError
-from .._message import Message, MessageType, unconditional_messages
+from .._message import MessageType, unconditional_messages
 from .._stream import Stream
 from .._versions import BST_FORMAT_VERSION
 from .. import _yaml
@@ -250,7 +250,7 @@ class App():
 
         # Mark the beginning of the session
         if session_name:
-            self._message(MessageType.START, session_name)
+            self.context.message(session_name, msg_type=MessageType.START)
 
         # Run the body of the session here, once everything is loaded
         try:
@@ -262,9 +262,9 @@ class App():
                 elapsed = self.stream.elapsed_time
 
                 if isinstance(e, StreamError) and e.terminated:  # pylint: disable=no-member
-                    self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
+                    self.context.warn(session_name + ' Terminated', elapsed=elapsed)
                 else:
-                    self._message(MessageType.FAIL, session_name, elapsed=elapsed)
+                    self.context.message(session_name, elapsed=elapsed, msg_type=MessageType.FAIL)
 
                     # Notify session failure
                     self._notify("{} failed".format(session_name), "{}".format(e))
@@ -282,7 +282,9 @@ class App():
         else:
             # No exceptions occurred, print session time and summary
             if session_name:
-                self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+                self.context.message(session_name,
+                                     elapsed=self.stream.elapsed_time,
+                                     msg_type=MessageType.SUCCESS)
                 if self._started:
                     self._print_summary()
 
@@ -428,21 +430,13 @@ class App():
         if self.interactive:
             self.notify(title, text)
 
-    # Local message propagator
-    #
-    def _message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        self.context.message(
-            Message(None, message_type, message, **args))
-
     # Exception handler
     #
     def _global_exception_handler(self, etype, value, tb):
 
         # Print the regular BUG message
         formatted = "".join(traceback.format_exception(etype, value, tb))
-        self._message(MessageType.BUG, str(value),
-                      detail=formatted)
+        self.context.message(value, detail=formatted, msg_type=MessageType.BUG)
 
         # If the scheduler has started, try to terminate all jobs gracefully,
         # otherwise exit immediately.
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 1f75b2e..c57c54f 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -24,7 +24,6 @@ import itertools
 from operator import itemgetter
 
 from ._exceptions import PipelineError
-from ._message import Message, MessageType
 from ._profile import Topics, profile_start, profile_end
 from . import Scope, Consistency
 from ._project import ProjectRefStorage
@@ -201,8 +200,8 @@ class Pipeline():
             for t in targets:
                 new_elm = t._get_source_element()
                 if new_elm != t and not silent:
-                    self._message(MessageType.INFO, "Element '{}' redirected to '{}'"
-                                  .format(t.name, new_elm.name))
+                    self._context.info("Element '{}' redirected to '{}'"
+                                       .format(t.name, new_elm.name))
                 if new_elm not in elements:
                     elements.append(new_elm)
         elif mode == PipelineSelection.PLAN:
@@ -433,15 +432,6 @@ class Pipeline():
 
                 raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
 
-    # _message()
-    #
-    # Local message propagator
-    #
-    def _message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        self._context.message(
-            Message(None, message_type, message, **args))
-
 
 # _Planner()
 #
diff --git a/buildstream/_project.py b/buildstream/_project.py
index 83aa1f4..7126cf7 100644
--- a/buildstream/_project.py
+++ b/buildstream/_project.py
@@ -37,7 +37,6 @@ from ._projectrefs import ProjectRefs, ProjectRefStorage
 from ._versions import BST_FORMAT_VERSION
 from ._loader import Loader
 from .element import Element
-from ._message import Message, MessageType
 from ._includes import Includes
 from ._platform import Platform
 
@@ -337,8 +336,7 @@ class Project():
                 for source, ref in redundant_refs
             ]
             detail += "\n".join(lines)
-            self._context.message(
-                Message(None, MessageType.WARN, "Ignoring redundant source references", detail=detail))
+            self._context.warn("Ignoring redundant source references", detail=detail)
 
         return elements
 
@@ -514,13 +512,9 @@ class Project():
 
         # Deprecation check
         if fail_on_overlap is not None:
-            self._context.message(
-                Message(
-                    None,
-                    MessageType.WARN,
-                    "Use of fail-on-overlap within project.conf " +
-                    "is deprecated. Consider using fatal-warnings instead."
-                )
+            self._context.warn(
+                "Use of fail-on-overlap within project.conf " +
+                "is deprecated. Consider using fatal-warnings instead."
             )
 
         # Load project.refs if it exists, this may be ignored.
diff --git a/buildstream/_scheduler/jobs/elementjob.py b/buildstream/_scheduler/jobs/elementjob.py
index 8ce5c06..864e458 100644
--- a/buildstream/_scheduler/jobs/elementjob.py
+++ b/buildstream/_scheduler/jobs/elementjob.py
@@ -18,8 +18,6 @@
 #
 from ruamel import yaml
 
-from ..._message import Message, MessageType
-
 from .job import Job
 
 
@@ -86,9 +84,8 @@ class ElementJob(Job):
         # This should probably be omitted for non-build tasks but it's harmless here
         elt_env = self._element.get_environment()
         env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
-        self.message(MessageType.LOG,
-                     "Build environment for element {}".format(self._element.name),
-                     detail=env_dump)
+        self._log("Build environment for element {}".format(self._element.name),
+                  detail=env_dump, plugin=self.element, scheduler=True)
 
         # Run the action
         return self._action_cb(self._element)
@@ -96,15 +93,6 @@ class ElementJob(Job):
     def parent_complete(self, success, result):
         self._complete_cb(self, self._element, success, self._result)
 
-    def message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        args['scheduler'] = True
-        self._scheduler.context.message(
-            Message(self._element._get_unique_id(),
-                    message_type,
-                    message,
-                    **args))
-
     def child_process_data(self):
         data = {}
 
@@ -113,3 +101,10 @@ class ElementJob(Job):
             data['workspace'] = workspace.to_dict()
 
         return data
+
+    # _fail()
+    #
+    # Override _fail to set scheduler kwarg to true.
+    #
+    def _fail(self, text, **kwargs):
+        super()._fail(text, scheduler=True, **kwargs)
diff --git a/buildstream/_scheduler/jobs/job.py b/buildstream/_scheduler/jobs/job.py
index 60ae0d0..ce5fa45 100644
--- a/buildstream/_scheduler/jobs/job.py
+++ b/buildstream/_scheduler/jobs/job.py
@@ -32,7 +32,7 @@ import psutil
 
 # BuildStream toplevel imports
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import MessageType, unconditional_messages
 from ... import _signals, utils
 
 # Return code values shutdown of job handling child processes
@@ -110,6 +110,7 @@ class Job():
         # Private members
         #
         self._scheduler = scheduler            # The scheduler
+        self._context = scheduler.context      # The context, used primarily for UI messaging.
         self._queue = None                     # A message passing queue
         self._process = None                   # The Process object
         self._watcher = None                   # Child process watcher
@@ -184,7 +185,7 @@ class Job():
         # First resume the job if it's suspended
         self.resume(silent=True)
 
-        self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
+        self._status("{} terminating".format(self.action_name))
 
         # Make sure there is no garbage on the queue
         self._parent_stop_listening()
@@ -217,8 +218,8 @@ class Job():
     def kill(self):
 
         # Force kill
-        self.message(MessageType.WARN,
-                     "{} did not terminate gracefully, killing".format(self.action_name))
+        self._warn("{} did not terminate gracefully, killing"
+                   .format(self.action_name))
 
         try:
             utils._kill_process_tree(self._process.pid)
@@ -233,8 +234,7 @@ class Job():
     #
     def suspend(self):
         if not self._suspended:
-            self.message(MessageType.STATUS,
-                         "{} suspending".format(self.action_name))
+            self._status("{} suspending".format(self.action_name))
 
             try:
                 # Use SIGTSTP so that child processes may handle and propagate
@@ -258,8 +258,7 @@ class Job():
     def resume(self, silent=False):
         if self._suspended:
             if not silent and not self._scheduler.terminated:
-                self.message(MessageType.STATUS,
-                             "{} resuming".format(self.action_name))
+                self._status("{} resuming".format(self.action_name))
 
             os.kill(self._process.pid, signal.SIGCONT)
             self._suspended = False
@@ -324,21 +323,6 @@ class Job():
         raise ImplError("Job '{kind}' does not implement child_process()"
                         .format(kind=type(self).__name__))
 
-    # message():
-    #
-    # Logs a message, this will be logged in the task's logfile and
-    # conditionally also be sent to the frontend.
-    #
-    # Args:
-    #    message_type (MessageType): The type of message to send
-    #    message (str): The message
-    #    kwargs: Remaining Message() constructor arguments
-    #
-    def message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        args['scheduler'] = True
-        self._scheduler.context.message(Message(None, message_type, message, **args))
-
     # child_process_data()
     #
     # Abstract method to retrieve additional data that should be
@@ -365,6 +349,32 @@ class Job():
     #
     #######################################################
 
+    def _debug(self, text, **kwargs):
+        self._context.debug(text, task_id=self._task_id, **kwargs)
+
+    def _status(self, text, **kwargs):
+        self._context.status(text, task_id=self._task_id, **kwargs)
+
+    def _info(self, text, **kwargs):
+        self._context.info(text, task_id=self._task_id, **kwargs)
+
+    def _warn(self, text, **kwargs):
+        self._context.warn(text, task_id=self._task_id, **kwargs)
+
+    def _error(self, text, **kwargs):
+        self._context.error(text, task_id=self._task_id, **kwargs)
+
+    def _log(self, text, **kwargs):
+        self._context.log(text, task_id=self._task_id, **kwargs)
+
+    # _fail()
+    #
+    # Only exists for sub classes to override and add kwargs to.
+    #
+    def _fail(self, text, **kwargs):
+        self._context.message(text, task_id=self._task_id,
+                              msg_type=MessageType.FAIL, **kwargs)
+
     # _child_action()
     #
     # Perform the action in the child process, this calls the action_cb.
@@ -391,7 +401,7 @@ class Job():
         # Set the global message handler in this child
         # process to forward messages to the parent process
         self._queue = queue
-        self._scheduler.context.set_message_handler(self._child_message_handler)
+        self._context.set_message_handler(self._child_message_handler)
 
         starttime = datetime.datetime.now()
         stopped_time = None
@@ -408,17 +418,17 @@ class Job():
         # Time, log and and run the action function
         #
         with _signals.suspendable(stop_time, resume_time), \
-            self._scheduler.context.recorded_messages(self._logfile) as filename:
+            self._context.recorded_messages(self._logfile) as filename:
 
-            self.message(MessageType.START, self.action_name, logfile=filename)
+            self._context.message(self.action_name, logfile=filename,
+                                  msg_type=MessageType.START, task_id=self._task_id)
 
             try:
                 # Try the task action
                 result = self.child_process()  # pylint: disable=assignment-from-no-return
             except SkipJob as e:
                 elapsed = datetime.datetime.now() - starttime
-                self.message(MessageType.SKIPPED, str(e),
-                             elapsed=elapsed, logfile=filename)
+                self._context.skipped(e, elapsed=elapsed, logfile=filename)
 
                 # Alert parent of skip by return code
                 self._child_shutdown(RC_SKIPPED)
@@ -427,13 +437,11 @@ class Job():
                 self._retry_flag = e.temporary
 
                 if self._retry_flag and (self._tries <= self._max_retries):
-                    self.message(MessageType.FAIL,
-                                 "Try #{} failed, retrying".format(self._tries),
-                                 elapsed=elapsed, logfile=filename)
+                    self._fail("Try #{} failed, retrying".format(self._tries),
+                               elapsed=elapsed, logfile=filename)
                 else:
-                    self.message(MessageType.FAIL, str(e),
-                                 elapsed=elapsed, detail=e.detail,
-                                 logfile=filename, sandbox=e.sandbox)
+                    self._fail(e, elapsed=elapsed, detail=e.detail,
+                               logfile=filename, sandbox=e.sandbox)
 
                 self._queue.put(Envelope('child_data', self.child_process_data()))
 
@@ -453,9 +461,9 @@ class Job():
                 elapsed = datetime.datetime.now() - starttime
                 detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
 
-                self.message(MessageType.BUG, self.action_name,
-                             elapsed=elapsed, detail=detail,
-                             logfile=filename)
+                self._context.message(self.action_name, elapsed=elapsed,
+                                      detail=detail, logfile=filename,
+                                      task_id=self._task_id, msg_type=MessageType.BUG)
                 # Unhandled exceptions should permenantly fail
                 self._child_shutdown(RC_PERM_FAIL)
 
@@ -465,8 +473,10 @@ class Job():
                 self._child_send_result(result)
 
                 elapsed = datetime.datetime.now() - starttime
-                self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed,
-                             logfile=filename)
+                self._context.message(self.action_name,
+                                      elapsed=elapsed, logfile=filename,
+                                      msg_type=MessageType.SUCCESS,
+                                      task_id=self._task_id)
 
                 # Shutdown needs to stay outside of the above context manager,
                 # make sure we dont try to handle SIGTERM while the process
@@ -603,7 +613,7 @@ class Job():
         if envelope._message_type == 'message':
             # Propagate received messages from children
             # back through the context.
-            self._scheduler.context.message(envelope._message)
+            self._context._send_message(envelope._message)
         elif envelope._message_type == 'error':
             # For regression tests only, save the last error domain / reason
             # reported from a child task in the main process, this global state
diff --git a/buildstream/_scheduler/queues/buildqueue.py b/buildstream/_scheduler/queues/buildqueue.py
index 984a545..c02e3e5 100644
--- a/buildstream/_scheduler/queues/buildqueue.py
+++ b/buildstream/_scheduler/queues/buildqueue.py
@@ -51,10 +51,10 @@ class BuildQueue(Queue):
             self._tried.add(element)
             _, description, detail = element._get_build_result()
             logfile = element._get_build_log()
-            self._message(element, MessageType.FAIL, description,
-                          detail=detail, action_name=self.action_name,
-                          elapsed=timedelta(seconds=0),
-                          logfile=logfile)
+            self._context.message(description, msg_type=MessageType.FAIL, plugin=element,
+                                  detail=detail, action_name=self.action_name,
+                                  elapsed=timedelta(seconds=0),
+                                  logfile=logfile)
             job = ElementJob(self._scheduler, self.action_name,
                              logfile, element=element, queue=self,
                              resources=self.resources,
diff --git a/buildstream/_scheduler/queues/queue.py b/buildstream/_scheduler/queues/queue.py
index 909cebb..df51f85 100644
--- a/buildstream/_scheduler/queues/queue.py
+++ b/buildstream/_scheduler/queues/queue.py
@@ -30,7 +30,7 @@ from ..resources import ResourceType
 
 # BuildStream toplevel imports
 from ..._exceptions import BstError, set_last_task_error
-from ..._message import Message, MessageType
+from ..._message import MessageType
 
 
 # Queue status for a given element
@@ -72,6 +72,7 @@ class Queue():
         # Private members
         #
         self._scheduler = scheduler
+        self._context = scheduler.context
         self._wait_queue = deque()
         self._done_queue = deque()
         self._max_retries = 0
@@ -270,17 +271,19 @@ class Queue():
         # Handle any workspace modifications now
         #
         if workspace_dict:
-            context = element._get_context()
-            workspaces = context.get_workspaces()
+            workspaces = self._context.get_workspaces()
             if workspaces.update_workspace(element._get_full_name(), workspace_dict):
                 try:
                     workspaces.save_config()
                 except BstError as e:
-                    self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
-                except Exception as e:   # pylint: disable=broad-except
-                    self._message(element, MessageType.BUG,
-                                  "Unhandled exception while saving workspaces",
-                                  detail=traceback.format_exc())
+                    self._context.error("Error saving workspaces",
+                                        detail=str(e),
+                                        plugin=element)
+                except Exception as e:  # pylint: disable=broad-except
+                    self._context.message("Unhandled exception while saving workspaces",
+                                          msg_type=MessageType.BUG,
+                                          detail=traceback.format_exc(),
+                                          plugin=element)
 
     # _job_done()
     #
@@ -304,10 +307,10 @@ class Queue():
         try:
             self.done(job, element, result, success)
         except BstError as e:
-
             # Report error and mark as failed
             #
-            self._message(element, MessageType.ERROR, "Post processing error", detail=str(e))
+            self._context.error("Post processing error",
+                                plugin=element, detail=traceback.format_exc())
             self.failed_elements.append(element)
 
             # Treat this as a task error as it's related to a task
@@ -317,13 +320,12 @@ class Queue():
             #
             set_last_task_error(e.domain, e.reason)
 
-        except Exception as e:   # pylint: disable=broad-except
-
+        except Exception:   # pylint: disable=broad-except
             # Report unhandled exceptions and mark as failed
             #
-            self._message(element, MessageType.BUG,
-                          "Unhandled exception in post processing",
-                          detail=traceback.format_exc())
+            self._context.message("Unhandled exception in post processing",
+                                  plugin=element, msg_type=MessageType.BUG,
+                                  detail=traceback.format_exc())
             self.failed_elements.append(element)
         else:
             #
@@ -343,13 +345,6 @@ class Queue():
             else:
                 self.failed_elements.append(element)
 
-    # Convenience wrapper for Queue implementations to send
-    # a message for the element they are processing
-    def _message(self, element, message_type, brief, **kwargs):
-        context = element._get_context()
-        message = Message(element._get_unique_id(), message_type, brief, **kwargs)
-        context.message(message)
-
     def _element_log_path(self, element):
         project = element._get_project()
         key = element._get_display_key()[1]
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 6e2e8b2..67d0766 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -25,11 +25,11 @@ import stat
 import shlex
 import shutil
 import tarfile
+import traceback
 from contextlib import contextmanager
 from tempfile import TemporaryDirectory
 
 from ._exceptions import StreamError, ImplError, BstError, set_last_task_error
-from ._message import Message, MessageType
 from ._scheduler import Scheduler, SchedStatus, TrackQueue, FetchQueue, BuildQueue, PullQueue, PushQueue
 from ._pipeline import Pipeline, PipelineSelection
 from . import utils, _yaml, _site
@@ -517,7 +517,7 @@ class Stream():
                 target._open_workspace()
 
         workspaces.save_config()
-        self._message(MessageType.INFO, "Saved workspace configuration")
+        self._context.info("Saved workspace configuration")
 
     # workspace_close
     #
@@ -544,7 +544,7 @@ class Stream():
         # Delete the workspace and save the configuration
         workspaces.delete_workspace(element_name)
         workspaces.save_config()
-        self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
+        self._context.info("Closed workspace for {}".format(element_name))
 
     # workspace_reset
     #
@@ -585,8 +585,8 @@ class Stream():
             workspace_path = workspace.get_absolute_path()
             if soft:
                 workspace.prepared = False
-                self._message(MessageType.INFO, "Reset workspace state for {} at: {}"
-                              .format(element.name, workspace_path))
+                self._context.info("Reset workspace state for {} at: {}"
+                                   .format(element.name, workspace.path))
                 continue
 
             with element.timed_activity("Removing workspace directory {}"
@@ -603,9 +603,8 @@ class Stream():
             with element.timed_activity("Staging sources to {}".format(workspace_path)):
                 element._open_workspace()
 
-            self._message(MessageType.INFO,
-                          "Reset workspace for {} at: {}".format(element.name,
-                                                                 workspace_path))
+            self._context.info("Reset workspace for {} at: {}"
+                               .format(element.name, workspace._path))
 
         workspaces.save_config()
 
@@ -681,7 +680,7 @@ class Stream():
         # source-bundle only supports one target
         target = self.targets[0]
 
-        self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
+        self._context.info("Bundling sources for target {}".format(target.name))
 
         # Find the correct filename for the compression algorithm
         tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -961,15 +960,6 @@ class Stream():
 
         return selected, track_selected
 
-    # _message()
-    #
-    # Local message propagator
-    #
-    def _message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        self._context.message(
-            Message(None, message_type, message, **args))
-
     # _add_queue()
     #
     # Adds a queue to the stream
@@ -1020,10 +1010,11 @@ class Stream():
             for element in self.total_elements:
                 element._update_state()
         except BstError as e:
-            self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
+            self._context.error("Error resolving final state", detail=e)
             set_last_task_error(e.domain, e.reason)
-        except Exception as e:   # pylint: disable=broad-except
-            self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
+        except Exception as e:  # pylint: disable=broad-except
+            self._context.message("Unhandled exception while resolving final state",
+                                  detail=traceback.format_exc())
 
         if status == SchedStatus.ERROR:
             raise StreamError()
diff --git a/buildstream/plugin.py b/buildstream/plugin.py
index 1b021d4..aea135e 100644
--- a/buildstream/plugin.py
+++ b/buildstream/plugin.py
@@ -117,7 +117,6 @@ from weakref import WeakValueDictionary
 from . import _yaml
 from . import utils
 from ._exceptions import PluginError, ImplError
-from ._message import Message, MessageType
 
 
 class Plugin():
@@ -464,8 +463,7 @@ class Plugin():
            brief (str): The brief message
            detail (str): An optional detailed message, can be multiline output
         """
-        if self.__context.log_debug:
-            self.__message(MessageType.DEBUG, brief, detail=detail)
+        self.__context.debug(brief, detail=detail, plugin=self)
 
     def status(self, brief, *, detail=None):
         """Print a status message
@@ -474,9 +472,9 @@ class Plugin():
            brief (str): The brief message
            detail (str): An optional detailed message, can be multiline output
 
-        Note: Status messages tell about what a plugin is currently doing
+        Note: Status messages tell the user what a plugin is currently doing
         """
-        self.__message(MessageType.STATUS, brief, detail=detail)
+        self.__context.status(brief, detail=detail, plugin=self)
 
     def info(self, brief, *, detail=None):
         """Print an informative message
@@ -488,7 +486,7 @@ class Plugin():
         Note: Informative messages tell the user something they might want
               to know, like if refreshing an element caused it to change.
         """
-        self.__message(MessageType.INFO, brief, detail=detail)
+        self.__context.info(brief, detail=detail, plugin=self)
 
     def warn(self, brief, *, detail=None, warning_token=None):
         """Print a warning message, checks warning_token against project configuration
@@ -512,7 +510,7 @@ class Plugin():
                 detail = detail if detail else ""
                 raise PluginError(message="{}\n{}".format(brief, detail), reason=warning_token)
 
-        self.__message(MessageType.WARN, brief=brief, detail=detail)
+        self.__context.warn(brief, detail=detail, plugin=self)
 
     def log(self, brief, *, detail=None):
         """Log a message into the plugin's log file
@@ -524,7 +522,7 @@ class Plugin():
            brief (str): The brief message
            detail (str): An optional detailed message, can be multiline output
         """
-        self.__message(MessageType.LOG, brief, detail=detail)
+        self.__context.log(brief, detail=detail, plugin=self)
 
     @contextmanager
     def timed_activity(self, activity_name, *, detail=None, silent_nested=False):
@@ -746,10 +744,6 @@ class Plugin():
 
         return (exit_code, output)
 
-    def __message(self, message_type, brief, **kwargs):
-        message = Message(self.__unique_id, message_type, brief, **kwargs)
-        self.__context.message(message)
-
     def __note_command(self, output, *popenargs, **kwargs):
         workdir = kwargs.get('cwd', os.getcwd())
         command = " ".join(popenargs[0])