You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:54:22 UTC
[buildstream] 02/03: Use new message handlers in favor of old
helpers
This is an automated email from the ASF dual-hosted git repository.
not-in-ldap pushed a commit to branch tlater/message-helpers
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 4cda0705622faa053833f04991e2fa062be55457
Author: Tristan Maat <tr...@codethink.co.uk>
AuthorDate: Tue Jul 3 16:49:06 2018 +0100
Use new message handlers in favor of old helpers
---
buildstream/_artifactcache/artifactcache.py | 11 +--------
buildstream/_context.py | 10 ++++----
buildstream/_frontend/app.py | 18 ++++-----------
buildstream/_pipeline.py | 15 +++---------
buildstream/_platform/linux.py | 10 ++++----
buildstream/_scheduler/job.py | 36 +++++++++++++++--------------
buildstream/_scheduler/queue.py | 8 ++++---
buildstream/_stream.py | 26 ++++++++-------------
buildstream/plugin.py | 15 ++++++------
9 files changed, 58 insertions(+), 91 deletions(-)
diff --git a/buildstream/_artifactcache/artifactcache.py b/buildstream/_artifactcache/artifactcache.py
index 2d745f8..e057451 100644
--- a/buildstream/_artifactcache/artifactcache.py
+++ b/buildstream/_artifactcache/artifactcache.py
@@ -324,15 +324,6 @@ class ArtifactCache():
# Local Private Methods #
################################################
- # _message()
- #
- # Local message propagator
- #
- def _message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- self.context.message(
- Message(None, message_type, message, **args))
-
# _set_remotes():
#
# Set the list of remote caches. If project is None, the global list of
@@ -356,7 +347,7 @@ class ArtifactCache():
#
def _initialize_remotes(self):
def remote_failed(url, error):
- self._message(MessageType.WARN, "Failed to fetch remote refs from {}: {}".format(url, error))
+ self.context.warn("Failed to fetch remote refs from {}: {}".format(url, error))
with self.context.timed_activity("Initializing remote caches", silent_nested=True):
self.initialize_remotes(on_failure=remote_failed)
diff --git a/buildstream/_context.py b/buildstream/_context.py
index 52dd655..72a1fd8 100644
--- a/buildstream/_context.py
+++ b/buildstream/_context.py
@@ -425,8 +425,7 @@ class Context():
with _signals.suspendable(stop_time, resume_time):
try:
# Push activity depth for status messages
- message = Message(unique_id, MessageType.START, activity_name, detail=detail)
- self.message(message)
+ self.start(activity_name, detail=detail, plugin=unique_id)
self._push_message_depth(silent_nested)
yield
@@ -434,15 +433,14 @@ class Context():
# Note the failure in status messages and reraise, the scheduler
# expects an error when there is an error.
elapsed = datetime.datetime.now() - starttime
- message = Message(unique_id, MessageType.FAIL, activity_name, elapsed=elapsed)
self._pop_message_depth()
- self.message(message)
+ self.failure(activity_name, detail=detail, elapsed=elapsed, plugin=unique_id)
raise
elapsed = datetime.datetime.now() - starttime
- message = Message(unique_id, MessageType.SUCCESS, activity_name, elapsed=elapsed)
self._pop_message_depth()
- self.message(message)
+ self.success(activity_name, detail=detail,
+ elapsed=elapsed, plugin=unique_id)
# _push_message_depth() / _pop_message_depth()
#
diff --git a/buildstream/_frontend/app.py b/buildstream/_frontend/app.py
index 4675b0e..c8fb977 100644
--- a/buildstream/_frontend/app.py
+++ b/buildstream/_frontend/app.py
@@ -249,7 +249,7 @@ class App():
# Mark the beginning of the session
if session_name:
- self._message(MessageType.START, session_name)
+ self.context.start(session_name)
# Run the body of the session here, once everything is loaded
try:
@@ -261,9 +261,9 @@ class App():
elapsed = self.stream.elapsed_time
if isinstance(e, StreamError) and e.terminated: # pylint: disable=no-member
- self._message(MessageType.WARN, session_name + ' Terminated', elapsed=elapsed)
+ self.context.warn(session_name + ' Terminated', elapsed=elapsed)
else:
- self._message(MessageType.FAIL, session_name, elapsed=elapsed)
+ self.context.failure(session_name, elapsed=elapsed)
if self._started:
self._print_summary()
@@ -274,7 +274,7 @@ class App():
else:
# No exceptions occurred, print session time and summary
if session_name:
- self._message(MessageType.SUCCESS, session_name, elapsed=self.stream.elapsed_time)
+ self.context.success(session_name, elapsed=self.stream.elapsed_time)
if self._started:
self._print_summary()
@@ -411,21 +411,13 @@ class App():
# Local Functions #
############################################################
- # Local message propagator
- #
- def _message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- self.context.message(
- Message(None, message_type, message, **args))
-
# Exception handler
#
def _global_exception_handler(self, etype, value, tb):
# Print the regular BUG message
formatted = "".join(traceback.format_exception(etype, value, tb))
- self._message(MessageType.BUG, str(value),
- detail=formatted)
+ self.context.bug(str(value), detail=formatted)
# If the scheduler has started, try to terminate all jobs gracefully,
# otherwise exit immediately.
diff --git a/buildstream/_pipeline.py b/buildstream/_pipeline.py
index 9f4504d..35ba560 100644
--- a/buildstream/_pipeline.py
+++ b/buildstream/_pipeline.py
@@ -129,7 +129,7 @@ class Pipeline():
for source, ref in redundant_refs
]
detail += "\n".join(lines)
- self._message(MessageType.WARN, "Ignoring redundant source references", detail=detail)
+ self._context.warn("Ignoring redundant source references", detail=detail)
# Now create element groups to match the input target groups
elt_iter = iter(elements)
@@ -220,8 +220,8 @@ class Pipeline():
for t in targets:
new_elm = t._get_source_element()
if new_elm != t and not silent:
- self._message(MessageType.INFO, "Element '{}' redirected to '{}'"
- .format(t.name, new_elm.name))
+ self._context.info("Element '{}' redirected to '{}'"
+ .format(t.name, new_elm.name))
if new_elm not in elements:
elements.append(new_elm)
elif mode == PipelineSelection.PLAN:
@@ -444,15 +444,6 @@ class Pipeline():
raise PipelineError("Untrackable sources", detail=detail, reason="untrackable-sources")
- # _message()
- #
- # Local message propagator
- #
- def _message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- self._context.message(
- Message(None, message_type, message, **args))
-
# _Planner()
#
diff --git a/buildstream/_platform/linux.py b/buildstream/_platform/linux.py
index fec512b..6f51266 100644
--- a/buildstream/_platform/linux.py
+++ b/buildstream/_platform/linux.py
@@ -75,9 +75,9 @@ class Linux(Platform):
return True
else:
- context.message(
- Message(None, MessageType.WARN,
- "Unable to create user namespaces with bubblewrap, resorting to fallback",
- detail="Some builds may not function due to lack of uid / gid 0, " +
- "artifacts created will not be trusted for push purposes."))
+ context.warn(
+ "Unable to create user namespaces with bubblewrap, resorting to fallback",
+ detail="Some builds may not function due to lack of uid / gid 0, " +
+ "artifacts created will not be trusted for push purposes."
+ )
return False
diff --git a/buildstream/_scheduler/job.py b/buildstream/_scheduler/job.py
index cc35064..c501eb0 100644
--- a/buildstream/_scheduler/job.py
+++ b/buildstream/_scheduler/job.py
@@ -173,8 +173,8 @@ class Job():
# First resume the job if it's suspended
self.resume(silent=True)
- self._message(self.element, MessageType.STATUS,
- "{} terminating".format(self.action_name))
+ self._message("{} terminating".format(self.action_name),
+ MessageType.STATUS, self.element)
# Make sure there is no garbage on the queue
self._parent_stop_listening()
@@ -205,8 +205,9 @@ class Job():
def kill(self):
# Force kill
- self._message(self.element, MessageType.WARN,
- "{} did not terminate gracefully, killing".format(self.action_name))
+ self._message("{} did not terminate gracefully, killing"
+ .format(self.action_name), self.element,
+ MessageType.WARN)
utils._kill_process_tree(self._process.pid)
# suspend()
@@ -215,8 +216,8 @@ class Job():
#
def suspend(self):
if not self._suspended:
- self._message(self.element, MessageType.STATUS,
- "{} suspending".format(self.action_name))
+ self._message("{} suspending".format(self.action_name),
+ self.element, MessageType.STATUS)
try:
# Use SIGTSTP so that child processes may handle and propagate
@@ -240,8 +241,8 @@ class Job():
def resume(self, silent=False):
if self._suspended:
if not silent:
- self._message(self.element, MessageType.STATUS,
- "{} resuming".format(self.action_name))
+ self._message("{} resuming".format(self.action_name),
+ self.element, MessageType.STATUS)
os.kill(self._process.pid, signal.SIGCONT)
self._suspended = False
@@ -268,14 +269,15 @@ class Job():
# message (str): The message
# kwargs: Remaining Message() constructor arguments
#
- def _message(self, plugin, message_type, message, **kwargs):
+ def _message(self, message, message_type, plugin, **kwargs):
args = dict(kwargs)
args['scheduler'] = True
- self._scheduler.context.message(
- Message(plugin._get_unique_id(),
- message_type,
- message,
- **args))
+ self._scheduler.context.msg(
+ message,
+ plugin=plugin._get_unique_id(),
+ msg_type=message_type,
+ **args
+ )
# _child_action()
#
@@ -324,15 +326,15 @@ class Job():
with _signals.suspendable(stop_time, resume_time), \
element._logging_enabled(self.action_name) as filename:
- self._message(element, MessageType.START, self.action_name, logfile=filename)
+ self._message(self.action_name, MessageType.START, element, logfile=filename)
# Print the element's environment at the beginning of any element's log file.
#
# This should probably be omitted for non-build tasks but it's harmless here
elt_env = element.get_environment()
env_dump = yaml.round_trip_dump(elt_env, default_flow_style=False, allow_unicode=True)
- self._message(element, MessageType.LOG,
- "Build environment for element {}".format(element.name),
+ self._message("Build environment for element {}".format(element.name),
+ MessageType.LOG, element,
detail=env_dump, logfile=filename)
try:
diff --git a/buildstream/_scheduler/queue.py b/buildstream/_scheduler/queue.py
index 15caf83..2225b0a 100644
--- a/buildstream/_scheduler/queue.py
+++ b/buildstream/_scheduler/queue.py
@@ -274,7 +274,9 @@ class Queue():
try:
workspaces.save_config()
except BstError as e:
- self._message(element, MessageType.ERROR, "Error saving workspaces", detail=str(e))
+ element._get_context().error("Error saving workspaces",
+ detail=str(e),
+ plugin=element._get_unique_id())
except Exception as e: # pylint: disable=broad-except
self._message(element, MessageType.BUG,
"Unhandled exception while saving workspaces",
@@ -351,5 +353,5 @@ class Queue():
# a message for the element they are processing
def _message(self, element, message_type, brief, **kwargs):
context = element._get_context()
- message = Message(element._get_unique_id(), message_type, brief, **kwargs)
- context.message(message)
+ context._message(brief, plugin=element._get_unique_id(),
+ msg_type=message_type, **kwargs)
diff --git a/buildstream/_stream.py b/buildstream/_stream.py
index 5013daf..b2b1b51 100644
--- a/buildstream/_stream.py
+++ b/buildstream/_stream.py
@@ -468,7 +468,7 @@ class Stream():
target._open_workspace()
workspaces.save_config()
- self._message(MessageType.INFO, "Saved workspace configuration")
+ self._context.info("Saved workspace configuration")
# workspace_close
#
@@ -495,7 +495,7 @@ class Stream():
# Delete the workspace and save the configuration
workspaces.delete_workspace(element_name)
workspaces.save_config()
- self._message(MessageType.INFO, "Closed workspace for {}".format(element_name))
+ self._context.info("Closed workspace for {}".format(element_name))
# workspace_reset
#
@@ -534,8 +534,8 @@ class Stream():
if soft:
workspace.prepared = False
- self._message(MessageType.INFO, "Reset workspace state for {} at: {}"
- .format(element.name, workspace.path))
+ self._context.info("Reset workspace state for {} at: {}"
+ .format(element.name, workspace.path))
continue
with element.timed_activity("Removing workspace directory {}"
@@ -552,7 +552,8 @@ class Stream():
with element.timed_activity("Staging sources to {}".format(workspace.path)):
element._open_workspace()
- self._message(MessageType.INFO, "Reset workspace for {} at: {}".format(element.name, workspace.path))
+ self._context.info("Reset workspace for {} at: {}"
+ .format(element.name, workspace.path))
workspaces.save_config()
@@ -626,7 +627,7 @@ class Stream():
# source-bundle only supports one target
target = self.targets[0]
- self._message(MessageType.INFO, "Bundling sources for target {}".format(target.name))
+ self._context.info("Bundling sources for target {}".format(target.name))
# Find the correct filename for the compression algorithm
tar_location = os.path.join(directory, target.normal_name + ".tar")
@@ -881,15 +882,6 @@ class Stream():
return selected, track_selected
- # _message()
- #
- # Local message propagator
- #
- def _message(self, message_type, message, **kwargs):
- args = dict(kwargs)
- self._context.message(
- Message(None, message_type, message, **args))
-
# _add_queue()
#
# Adds a queue to the stream
@@ -940,10 +932,10 @@ class Stream():
for element in self.total_elements:
element._update_state()
except BstError as e:
- self._message(MessageType.ERROR, "Error resolving final state", detail=str(e))
+ self._context.error("Error resolving final state", detail=str(e))
set_last_task_error(e.domain, e.reason)
except Exception as e: # pylint: disable=broad-except
- self._message(MessageType.BUG, "Unhandled exception while resolving final state", detail=str(e))
+ self._context.bug("Unhandled exception while resolving final state", detail=str(e))
if status == SchedStatus.ERROR:
raise StreamError()
diff --git a/buildstream/plugin.py b/buildstream/plugin.py
index 29fe2cb..3f27207 100644
--- a/buildstream/plugin.py
+++ b/buildstream/plugin.py
@@ -100,7 +100,6 @@ from weakref import WeakValueDictionary
from . import _yaml
from . import utils
from ._exceptions import PluginError, ImplError
-from ._message import Message, MessageType
class Plugin():
@@ -404,7 +403,7 @@ class Plugin():
detail (str): An optional detailed message, can be multiline output
"""
if self.__context.log_debug:
- self.__message(MessageType.DEBUG, brief, detail=detail)
+ self.__context.debug(brief, detail=detail)
def status(self, brief, *, detail=None):
"""Print a status message
@@ -415,7 +414,7 @@ class Plugin():
Note: Status messages tell about what a plugin is currently doing
"""
- self.__message(MessageType.STATUS, brief, detail=detail)
+ self.__context.status(brief, detail=detail)
def info(self, brief, *, detail=None):
"""Print an informative message
@@ -427,7 +426,7 @@ class Plugin():
Note: Informative messages tell the user something they might want
to know, like if refreshing an element caused it to change.
"""
- self.__message(MessageType.INFO, brief, detail=detail)
+ self.__context.info(brief, detail=detail)
def warn(self, brief, *, detail=None):
"""Print a warning message
@@ -436,7 +435,7 @@ class Plugin():
brief (str): The brief message
detail (str): An optional detailed message, can be multiline output
"""
- self.__message(MessageType.WARN, brief, detail=detail)
+ self.__context.warn(brief, detail=detail)
def log(self, brief, *, detail=None):
"""Log a message into the plugin's log file
@@ -448,7 +447,7 @@ class Plugin():
brief (str): The brief message
detail (str): An optional detailed message, can be multiline output
"""
- self.__message(MessageType.LOG, brief, detail=detail)
+ self.log(brief, detail=detail)
@contextmanager
def timed_activity(self, activity_name, *, detail=None, silent_nested=False):
@@ -648,8 +647,8 @@ class Plugin():
return (exit_code, output)
def __message(self, message_type, brief, **kwargs):
- message = Message(self.__unique_id, message_type, brief, **kwargs)
- self.__context.message(message)
+ self.__context._message(brief, plugin=self.__unique_id,
+ msg_type=message_type, **kwargs)
def __note_command(self, output, *popenargs, **kwargs):
workdir = os.getcwd()