You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:54:22 UTC

[buildstream] 02/03: Use new message handlers in favor of old helpers

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch tlater/message-helpers
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 4cda0705622faa053833f04991e2fa062be55457
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Jul 3 16:49:06 2018 +0100

    Use new message handlers in favor of old helpers
---
 buildstream/_artifactcache/artifactcache.py | 11 +--------
 buildstream/_context.py                     | 10 ++++----
 buildstream/_frontend/app.py                | 18 ++++-----------
 buildstream/_pipeline.py                    | 15 +++---------
 buildstream/_platform/linux.py              | 10 ++++----
 buildstream/_scheduler/job.py               | 36 +++++++++++++++--------------
 buildstream/_scheduler/queue.py             |  8 ++++---
 buildstream/_stream.py                      | 26 ++++++++-------------
 buildstream/plugin.py                       | 15 ++++++------
 9 files changed, 58 insertions(+), 91 deletions(-)

diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 2d745f8..e057451 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -324,15 +324,6 @@ class ArtifactCache():
     #               Local Private Methods          #
     ################################################
 
-    # _message()
-    #
-    # Local message propagator
-    #
-    def _message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        self.context.message(
-            Message(None, message_type, message, **args))
-
     # _set_remotes():
     #
     # Set the list of remote caches. If project is None, the global list of
@@ -356,7 +347,7 @@ class ArtifactCache():
     #
     def _initialize_remotes(self):
         def remote_failed(url, error):
-            self._message(MessageType.WARN, "Failed to fetch remote refs from {}: {}".format(url, error))
+            self.context.warn("Failed to fetch remote refs from {}: {}".format(url, error))
 
         with self.context.timed_activity("Initializing remote caches", silent_nested=True):
             self.initialize_remotes(on_failure=remote_failed)
diff --git a/buildstream/_context.py b/buildstream/_context.py
index 52dd655..72a1fd8 100644
--- a/buildstream/_context.py
+++ b/buildstream/_context.py
@@ -425,8 +425,7 @@ class Context():
         with _signals.suspendable(stop_time, resume_time):
             try:
                 # Push activity depth for status messages
-                message = Message(unique_id, MessageType.START, activity_name, detail=detail)
-                self.message(message)
+                self.start(activity_name, detail=detail, plugin=unique_id)
                 self._push_message_depth(silent_nested)
                 yield
 
@@ -434,15 +433,14 @@ class Context():
                 # Note the failure in status messages and reraise, the scheduler
                 # expects an error when there is an error.
                 elapsed = datetime.datetime.now() - starttime
-                message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
                 self._pop_message_depth()
-                self.message(message)
+                self.failure(activity_name, detail=detail, elapsed=elapsed, plugin=unique_id)
                 raise
 
             elapsed = datetime.datetime.now() - starttime
-            message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
             self._pop_message_depth()
-            self.message(message)
+            self.success(activity_name, detail=detail,
+                         elapsed=elapsed, plugin=unique_id)
 
     # _push_message_depth() / _pop_message_depth()
     #
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 4675b0e..c8fb977 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -249,7 +249,7 @@ class App():
 
         # Mark the beginning of the session
         if session_name:
-            self._message(MessageType.START, session_name)
+            self.context.start(session_name)
 
         # Run the body of the session here, once everything is loaded
         try:
@@ -261,9 +261,9 @@ class App():
                 elapsed = self.stream.elapsed_time
 
                 if isinstance(e, StreamError) and e.terminated:  # pylint: disable=no-member
-                    self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
+                    self.context.warn(session_name + ' Terminated', elapsed=elapsed)
                 else:
-                    self._message(MessageType.FAIL, session_name, elapsed=elapsed)
+                    self.context.failure(session_name, elapsed=elapsed)
 
                 if self._started:
                     self._print_summary()
@@ -274,7 +274,7 @@ class App():
         else:
             # No exceptions occurred, print session time and summary
             if session_name:
-                self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+                self.context.success(session_name, elapsed=self.stream.elapsed_time)
                 if self._started:
                     self._print_summary()
 
@@ -411,21 +411,13 @@ class App():
     #                      Local Functions                     #
     ############################################################
 
-    # Local message propagator
-    #
-    def _message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        self.context.message(
-            Message(None, message_type, message, **args))
-
     # Exception handler
     #
     def _global_exception_handler(self, etype, value, tb):
 
         # Print the regular BUG message
         formatted = "".join(traceback.format_exception(etype, value, tb))
-        self._message(MessageType.BUG, str(value),
-                      detail=formatted)
+        self.context.bug(str(value), detail=formatted)
 
         # If the scheduler has started, try to terminate all jobs gracefully,
         # otherwise exit immediately.
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 9f4504d..35ba560 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -129,7 +129,7 @@ class Pipeline():
                 for source, ref in redundant_refs
             ]
             detail += "\n".join(lines)
-            self._message(MessageType.WARN, "Ignoring redundant source references", detail=detail)
+            self._context.warn("Ignoring redundant source references", detail=detail)
 
         # Now create element groups to match the input target groups
         elt_iter = iter(elements)
@@ -220,8 +220,8 @@ class Pipeline():
             for t in targets:
                 new_elm = t._get_source_element()
                 if new_elm != t and not silent:
-                    self._message(MessageType.INFO, "Element '{}' redirected to '{}'"
-                                  .format(t.name, new_elm.name))
+                    self._context.info("Element '{}' redirected to '{}'"
+                                       .format(t.name, new_elm.name))
                 if new_elm not in elements:
                     elements.append(new_elm)
         elif mode == PipelineSelection.PLAN:
@@ -444,15 +444,6 @@ class Pipeline():
 
                 raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
 
-    # _message()
-    #
-    # Local message propagator
-    #
-    def _message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        self._context.message(
-            Message(None, message_type, message, **args))
-
 
 # _Planner()
 #
diff --git a/buildstream/_platform/linux.py b/buildstream/_platform/linux.py
index fec512b..6f51266 100644
--- a/buildstream/_platform/linux.py
+++ b/buildstream/_platform/linux.py
@@ -75,9 +75,9 @@ class Linux(Platform):
             return True
 
         else:
-            context.message(
-                Message(None, MessageType.WARN,
-                        "Unable to create user namespaces with bubblewrap, resorting to fallback",
-                        detail="Some builds may not function due to lack of uid / gid 0, " +
-                        "artifacts created will not be trusted for push purposes."))
+            context.warn(
+                "Unable to create user namespaces with bubblewrap, resorting to fallback",
+                detail="Some builds may not function due to lack of uid / gid 0, " +
+                "artifacts created will not be trusted for push purposes."
+            )
             return False
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/job.py
index cc35064..c501eb0 100644
--- a/buildstream/_scheduler/job.py
+++ b/buildstream/_scheduler/job.py
@@ -173,8 +173,8 @@ class Job():
         # First resume the job if it's suspended
         self.resume(silent=True)
 
-        self._message(self.element, MessageType.STATUS,
-                      "{} terminating".format(self.action_name))
+        self._message("{} terminating".format(self.action_name),
+                      MessageType.STATUS, self.element)
 
         # Make sure there is no garbage on the queue
         self._parent_stop_listening()
@@ -205,8 +205,9 @@ class Job():
     def kill(self):
 
         # Force kill
-        self._message(self.element, MessageType.WARN,
-                      "{} did not terminate gracefully, killing".format(self.action_name))
+        self._message("{} did not terminate gracefully, killing"
+                      .format(self.action_name), self.element,
+                      MessageType.WARN)
         utils._kill_process_tree(self._process.pid)
 
     # suspend()
@@ -215,8 +216,8 @@ class Job():
     #
     def suspend(self):
         if not self._suspended:
-            self._message(self.element, MessageType.STATUS,
-                          "{} suspending".format(self.action_name))
+            self._message("{} suspending".format(self.action_name),
+                          self.element, MessageType.STATUS)
 
             try:
                 # Use SIGTSTP so that child processes may handle and propagate
@@ -240,8 +241,8 @@ class Job():
     def resume(self, silent=False):
         if self._suspended:
             if not silent:
-                self._message(self.element, MessageType.STATUS,
-                              "{} resuming".format(self.action_name))
+                self._message("{} resuming".format(self.action_name),
+                              self.element, MessageType.STATUS)
 
             os.kill(self._process.pid, signal.SIGCONT)
             self._suspended = False
@@ -268,14 +269,15 @@ class Job():
     #    message (str): The message
     #    kwargs: Remaining Message() constructor arguments
     #
-    def _message(self, plugin, message_type, message, **kwargs):
+    def _message(self, message, message_type, plugin, **kwargs):
         args = dict(kwargs)
         args['scheduler'] = True
-        self._scheduler.context.message(
-            Message(plugin._get_unique_id(),
-                    message_type,
-                    message,
-                    **args))
+        self._scheduler.context.msg(
+            message,
+            plugin=plugin._get_unique_id(),
+            msg_type=message_type,
+            **args
+        )
 
     # _child_action()
     #
@@ -324,15 +326,15 @@ class Job():
         with _signals.suspendable(stop_time, resume_time), \
             element._logging_enabled(self.action_name) as filename:
 
-            self._message(element, MessageType.START, self.action_name, logfile=filename)
+            self._message(self.action_name, MessageType.START, element, logfile=filename)
 
             # Print the element's environment at the beginning of any element's log file.
             #
             # This should probably be omitted for non-build tasks but it's harmless here
             elt_env = element.get_environment()
             env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
-            self._message(element, MessageType.LOG,
-                          "Build environment for element {}".format(element.name),
+            self._message("Build environment for element {}".format(element.name),
+                          MessageType.LOG, element,
                           detail=env_dump, logfile=filename)
 
             try:
diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queue.py
index 15caf83..2225b0a 100644
--- a/buildstream/_scheduler/queue.py
+++ b/buildstream/_scheduler/queue.py
@@ -274,7 +274,9 @@ class Queue():
                 try:
                     workspaces.save_config()
                 except BstError as e:
-                    self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
+                    element._get_context().error("Error saving workspaces",
+                                                 detail=str(e),
+                                                 plugin=element._get_unique_id())
                 except Exception as e:   # pylint: disable=broad-except
                     self._message(element, MessageType.BUG,
                                   "Unhandled exception while saving workspaces",
@@ -351,5 +353,5 @@ class Queue():
     # a message for the element they are processing
     def _message(self, element, message_type, brief, **kwargs):
         context = element._get_context()
-        message = Message(element._get_unique_id(), message_type, brief, **kwargs)
-        context.message(message)
+        context._message(brief, plugin=element._get_unique_id(),
+                         msg_type=message_type, **kwargs)
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 5013daf..b2b1b51 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -468,7 +468,7 @@ class Stream():
                 target._open_workspace()
 
         workspaces.save_config()
-        self._message(MessageType.INFO, "Saved workspace configuration")
+        self._context.info("Saved workspace configuration")
 
     # workspace_close
     #
@@ -495,7 +495,7 @@ class Stream():
         # Delete the workspace and save the configuration
         workspaces.delete_workspace(element_name)
         workspaces.save_config()
-        self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
+        self._context.info("Closed workspace for {}".format(element_name))
 
     # workspace_reset
     #
@@ -534,8 +534,8 @@ class Stream():
 
             if soft:
                 workspace.prepared = False
-                self._message(MessageType.INFO, "Reset workspace state for {} at: {}"
-                              .format(element.name, workspace.path))
+                self._context.info("Reset workspace state for {} at: {}"
+                                   .format(element.name, workspace.path))
                 continue
 
             with element.timed_activity("Removing workspace directory {}"
@@ -552,7 +552,8 @@ class Stream():
             with element.timed_activity("Staging sources to {}".format(workspace.path)):
                 element._open_workspace()
 
-            self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(element.name, workspace.path))
+            self._context.info("Reset workspace for {} at: {}"
+                               .format(element.name, workspace.path))
 
         workspaces.save_config()
 
@@ -626,7 +627,7 @@ class Stream():
         # source-bundle only supports one target
         target = self.targets[0]
 
-        self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
+        self._context.info("Bundling sources for target {}".format(target.name))
 
         # Find the correct filename for the compression algorithm
         tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -881,15 +882,6 @@ class Stream():
 
         return selected, track_selected
 
-    # _message()
-    #
-    # Local message propagator
-    #
-    def _message(self, message_type, message, **kwargs):
-        args = dict(kwargs)
-        self._context.message(
-            Message(None, message_type, message, **args))
-
     # _add_queue()
     #
     # Adds a queue to the stream
@@ -940,10 +932,10 @@ class Stream():
             for element in self.total_elements:
                 element._update_state()
         except BstError as e:
-            self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
+            self._context.error("Error resolving final state", detail=str(e))
             set_last_task_error(e.domain, e.reason)
         except Exception as e:   # pylint: disable=broad-except
-            self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
+            self._context.bug("Unhandled exception while resolving final state", detail=str(e))
 
         if status == SchedStatus.ERROR:
             raise StreamError()
diff --git a/buildstream/plugin.py b/buildstream/plugin.py
index 29fe2cb..3f27207 100644
--- a/buildstream/plugin.py
+++ b/buildstream/plugin.py
@@ -100,7 +100,6 @@ from weakref import WeakValueDictionary
 from . import _yaml
 from . import utils
 from ._exceptions import PluginError, ImplError
-from ._message import Message, MessageType
 
 
 class Plugin():
@@ -404,7 +403,7 @@ class Plugin():
            detail (str): An optional detailed message, can be multiline output
         """
         if self.__context.log_debug:
-            self.__message(MessageType.DEBUG, brief, detail=detail)
+            self.__context.debug(brief, detail=detail)
 
     def status(self, brief, *, detail=None):
         """Print a status message
@@ -415,7 +414,7 @@ class Plugin():
 
         Note: Status messages tell about what a plugin is currently doing
         """
-        self.__message(MessageType.STATUS, brief, detail=detail)
+        self.__context.status(brief, detail=detail)
 
     def info(self, brief, *, detail=None):
         """Print an informative message
@@ -427,7 +426,7 @@ class Plugin():
         Note: Informative messages tell the user something they might want
               to know, like if refreshing an element caused it to change.
         """
-        self.__message(MessageType.INFO, brief, detail=detail)
+        self.__context.info(brief, detail=detail)
 
     def warn(self, brief, *, detail=None):
         """Print a warning message
@@ -436,7 +435,7 @@ class Plugin():
            brief (str): The brief message
            detail (str): An optional detailed message, can be multiline output
         """
-        self.__message(MessageType.WARN, brief, detail=detail)
+        self.__context.warn(brief, detail=detail)
 
     def log(self, brief, *, detail=None):
         """Log a message into the plugin's log file
@@ -448,7 +447,7 @@ class Plugin():
            brief (str): The brief message
            detail (str): An optional detailed message, can be multiline output
         """
-        self.__message(MessageType.LOG, brief, detail=detail)
+        self.log(brief, detail=detail)
 
     @contextmanager
     def timed_activity(self, activity_name, *, detail=None, silent_nested=False):
@@ -648,8 +647,8 @@ class Plugin():
         return (exit_code, output)
 
     def __message(self, message_type, brief, **kwargs):
-        message = Message(self.__unique_id, message_type, brief, **kwargs)
-        self.__context.message(message)
+        self.__context._message(brief, plugin=self.__unique_id,
+                                msg_type=message_type, **kwargs)
 
     def __note_command(self, output, *popenargs, **kwargs):
         workdir = os.getcwd()