You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 08:15:43 UTC
[buildstream] 08/13: _messenger.py: Make the messenger aware of
jobs and stop having multiple
This is an automated email from the ASF dual-hosted git repository.
tvb pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 58986f982d34177f753feaf9501038e43200bd73
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Wed Jul 8 19:17:05 2020 +0000
_messenger.py: Make the messenger aware of jobs and stop having multiple
---
src/buildstream/_frontend/app.py | 8 +-
src/buildstream/_messenger.py | 245 +++++++++++++++++++--------------
src/buildstream/_scheduler/jobs/job.py | 65 ++-------
tests/testutils/context.py | 2 +-
4 files changed, 154 insertions(+), 166 deletions(-)
diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 5d49e96..88c11c1 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -34,7 +34,7 @@ from .._context import Context
from .._project import Project
from .._exceptions import BstError, StreamError, LoadError, AppError
from ..exceptions import LoadErrorReason
-from .._message import Message, MessageType, unconditional_messages
+from .._message import Message, MessageType
from .._stream import Stream
from ..types import _SchedulerErrorAction
from .. import node
@@ -791,7 +791,7 @@ class App:
#
# Handle messages from the pipeline
#
- def _message_handler(self, message, is_silenced):
+ def _message_handler(self, message):
# Drop status messages from the UI if not verbose, we'll still see
# info messages and status messages will still go to the log files.
@@ -802,10 +802,6 @@ class App:
if message.message_type in [MessageType.FAIL, MessageType.BUG] and message.element_name is not None:
self._fail_messages[message.element_name] = message
- # Send to frontend if appropriate
- if is_silenced and (message.message_type not in unconditional_messages):
- return
-
# Format the message & cache it
text = self.logger.render(message)
self._message_text += text
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index eb3bd51..3220cb1 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -21,11 +21,11 @@ import os
import datetime
import threading
from contextlib import contextmanager
-from typing import Callable, Generator, Optional
+from typing import Callable, Generator, Optional, TextIO
from . import _signals
from ._exceptions import BstError
-from ._message import Message, MessageType
+from ._message import Message, MessageType, unconditional_messages
from ._state import State, _Task
@@ -48,9 +48,86 @@ class _TimeData:
self.start_time = start_time
-class MessageHandlerCallback:
- def __call__(self, message: Message, is_silenced: bool) -> None:
- pass
+class _JobRecorder:
+ def __init__(self, action_name: str, element_key: str, log_filename: str) -> None:
+ self.action_name = action_name
+ self.element_key = element_key
+ self.log_filename = log_filename
+
+ self.log_handle: Optional[TextIO] = None
+ self.silence_scope_depth = 0
+
+ @contextmanager
+ def enable_recording(self) -> Generator["_JobRecorder", None, None]:
+ # Ensure the directory exists first
+ directory = os.path.dirname(self.log_filename)
+ os.makedirs(directory, exist_ok=True)
+
+ with open(self.log_filename, "a") as logfile:
+
+ # Write one last line to the log and flush it to disk
+ def flush_log():
+
+ # If the process currently had something happening in the I/O stack
+ # then trying to reenter the I/O stack will fire a runtime error.
+ #
+ # So just try to flush as well as we can at SIGTERM time
+ try:
+ logfile.write("\n\nForcefully terminated\n")
+ logfile.flush()
+ except RuntimeError:
+ os.fsync(logfile.fileno())
+
+ self.log_handle = logfile
+
+ with _signals.terminator(flush_log):
+ yield self
+
+ # record_message()
+ #
+ # Records the message if recording is enabled
+ #
+ # Args:
+ # message (Message): The message to record
+ #
+ def record_message(self, message: Message) -> None:
+ INDENT = " "
+ EMPTYTIME = "--:--:--"
+ template = "[{timecode: <8}] {type: <7}"
+
+ # If this message is associated with an element or source plugin, print the
+ # full element name of the instance.
+ element_name = ""
+ if message.element_name:
+ template += " {element_name}"
+ element_name = message.element_name
+
+ template += ": {message}"
+
+ detail = ""
+ if message.detail is not None:
+ template += "\n\n{detail}"
+ detail = message.detail.rstrip("\n")
+ detail = INDENT + INDENT.join(detail.splitlines(True))
+
+ timecode = EMPTYTIME
+ if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+ hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2)
+ minutes, seconds = divmod(remainder, 60)
+ timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+ text = template.format(
+ timecode=timecode,
+ element_name=element_name,
+ type=message.message_type.upper(),
+ message=message.message,
+ detail=detail,
+ )
+
+ # Write to the open log file
+ assert self.log_handle is not None
+ self.log_handle.write("{}\n".format(text))
+ self.log_handle.flush()
class Messenger:
@@ -60,11 +137,11 @@ class Messenger:
self._active_simple_tasks: int = 0
self._render_status_cb: Optional[Callable[[], None]] = None
+ self._message_handler: Optional[Callable[[Message], None]] = None
+ self._global_silence_scope_depth = 0
+
self._locals = threading.local()
- self._locals.message_handler = None
- self._locals.log_handle = None
- self._locals.log_filename = None
- self._locals.silence_scope_depth = 0
+ self._locals.job = None
# set_message_handler()
#
@@ -74,8 +151,8 @@ class Messenger:
# Args:
# handler: The handler to call on message
#
- def set_message_handler(self, handler: MessageHandlerCallback) -> None:
- self._locals.message_handler = handler
+ def set_message_handler(self, handler: Callable[[Message], None]) -> None:
+ self._message_handler = handler
# set_state()
#
@@ -102,7 +179,9 @@ class Messenger:
# Returns: Whether messages are currently being silenced
#
def _silent_messages(self) -> bool:
- return self._locals.silence_scope_depth > 0
+ if self._locals.job is not None:
+ return self._locals.job.silence_scope_depth > 0
+ return self._global_silence_scope_depth > 0
# message():
#
@@ -113,15 +192,30 @@ class Messenger:
# message: A Message object
#
def message(self, message: Message) -> None:
- # If we are recording messages, dump a copy into the open log file.
- self._record_message(message)
+ job = self._locals.job
+
+ if job is not None:
+ message.action_name = job.action_name
+ message.logfile = job.log_filename
- # Send it off to the log handler (can be the frontend,
- # or it can be the child task which will propagate
- # to the frontend)
- assert self._locals.message_handler
+ # If no key has been set at this point, and the element job has
+ # a related key, set it.
+ if message.element_key is None:
+ message.element_key = job.element_key
- self._locals.message_handler(message, is_silenced=self._silent_messages())
+ # Job always record messages
+ self._locals.job.record_message(message)
+
+ # Don't log LOG messages from jobs
+ if message.message_type == MessageType.LOG:
+ return
+
+ # Don't forward if it is currently silent
+ if self._silent_messages() and (message.message_type not in unconditional_messages):
+ return
+
+ assert self._message_handler is not None
+ self._message_handler(message)
# silence()
#
@@ -140,12 +234,22 @@ class Messenger:
yield
return
- self._locals.silence_scope_depth += 1
+ in_job = self._locals.job is not None
+
+ if in_job:
+ self._locals.job.silence_scope_depth += 1
+ else:
+ self._global_silence_scope_depth += 1
+
try:
yield
finally:
- assert self._locals.silence_scope_depth > 0
- self._locals.silence_scope_depth -= 1
+ if in_job:
+ assert self._locals.job.silence_scope_depth > 0
+ self._locals.job.silence_scope_depth -= 1
+ else:
+ assert self._global_silence_scope_depth > 0
+ self._global_silence_scope_depth -= 1
# timed_activity()
#
@@ -254,7 +358,7 @@ class Messenger:
)
self.message(message)
- # recorded_messages()
+ # record_job()
#
# Records all messages in a log file while the context manager
# is active.
@@ -274,42 +378,20 @@ class Messenger:
# Yields: The fully qualified log filename
#
@contextmanager
- def recorded_messages(self, filename: str, logdir: str) -> Generator[str, None, None]:
+ def record_job(
+ self, action_name: str, element_key: str, filename: str, logdir: str
+ ) -> Generator[_JobRecorder, None, None]:
# We dont allow recursing in this context manager, and
# we also do not allow it in the main process.
- assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None
- assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None
+ assert not hasattr(self._locals, "job") or self._locals.job is None
- # Create the fully qualified logfile in the log directory,
- # appending the pid and .log extension at the end.
- self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
- self._locals.silence_scope_depth = 0
+ log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+ self._locals.job = _JobRecorder(action_name, element_key, log_filename)
- # Ensure the directory exists first
- directory = os.path.dirname(self._locals.log_filename)
- os.makedirs(directory, exist_ok=True)
-
- with open(self._locals.log_filename, "a") as logfile:
+ with self._locals.job.enable_recording() as job:
+ yield job
- # Write one last line to the log and flush it to disk
- def flush_log():
-
- # If the process currently had something happening in the I/O stack
- # then trying to reenter the I/O stack will fire a runtime error.
- #
- # So just try to flush as well as we can at SIGTERM time
- try:
- logfile.write("\n\nForcefully terminated\n")
- logfile.flush()
- except RuntimeError:
- os.fsync(logfile.fileno())
-
- self._locals.log_handle = logfile
- with _signals.terminator(flush_log):
- yield self._locals.log_filename
-
- self._locals.log_handle = None
- self._locals.log_filename = None
+ self._locals.job = None
# get_log_handle()
#
@@ -320,7 +402,9 @@ class Messenger:
# Returns: The active logging file handle, or None
#
def get_log_handle(self) -> Optional[str]:
- return self._locals.log_handle
+ if self._locals.job is not None:
+ return self._locals.job.log_handle
+ return None
# get_log_filename()
#
@@ -331,7 +415,7 @@ class Messenger:
# Returns: The active logging filename, or None
#
def get_log_filename(self) -> str:
- return self._locals.log_filename
+ return self._locals.job.log_filename
# timed_suspendable()
#
@@ -361,55 +445,6 @@ class Messenger:
with _signals.suspendable(stop_time, resume_time):
yield timedata
- # _record_message()
- #
- # Records the message if recording is enabled
- #
- # Args:
- # message (Message): The message to record
- #
- def _record_message(self, message: Message) -> None:
-
- if self._locals.log_handle is None:
- return
-
- INDENT = " "
- EMPTYTIME = "--:--:--"
- template = "[{timecode: <8}] {type: <7}"
-
- # If this message is associated with an element or source plugin, print the
- # full element name of the instance.
- element_name = ""
- if message.element_name:
- template += " {element_name}"
- element_name = message.element_name
-
- template += ": {message}"
-
- detail = ""
- if message.detail is not None:
- template += "\n\n{detail}"
- detail = message.detail.rstrip("\n")
- detail = INDENT + INDENT.join(detail.splitlines(True))
-
- timecode = EMPTYTIME
- if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
- hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2)
- minutes, seconds = divmod(remainder, 60)
- timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
- text = template.format(
- timecode=timecode,
- element_name=element_name,
- type=message.message_type.upper(),
- message=message.message,
- detail=detail,
- )
-
- # Write to the open log file
- self._locals.log_handle.write("{}\n".format(text))
- self._locals.log_handle.flush()
-
# _render_status()
#
# Calls the render status callback set in the messenger, but only if a
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index f331d3f..30308a9 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -28,7 +28,7 @@ import traceback
# BuildStream toplevel imports
from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import Message, MessageType
from ...types import FastEnum
@@ -67,7 +67,6 @@ class _Envelope:
class _MessageType(FastEnum):
- LOG_MESSAGE = 1
ERROR = 2
RESULT = 3
@@ -389,11 +388,7 @@ class Job:
if not self._listening:
return
- if envelope.message_type is _MessageType.LOG_MESSAGE:
- # Propagate received messages from children
- # back through the context.
- self._messenger.message(envelope.message)
- elif envelope.message_type is _MessageType.ERROR:
+ if envelope.message_type is _MessageType.ERROR:
# For regression tests only, save the last error domain / reason
# reported from a child task in the main process, this global state
# is currently managed in _exceptions.py
@@ -543,21 +538,20 @@ class ChildJob:
# Set the global message handler in this child
# process to forward messages to the parent process
self._pipe_w = pipe_w
- self._messenger.set_message_handler(self._child_message_handler)
# Time, log and and run the action function
#
- with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages(
- self._logfile, self._logdir
- ) as filename:
- self.message(MessageType.START, self.action_name, logfile=filename)
+ with self._messenger.timed_suspendable() as timeinfo, self._messenger.record_job(
+ self.action_name, self._message_element_key, self._logfile, self._logdir
+ ):
+ self.message(MessageType.START, self.action_name)
try:
# Try the task action
result = self.child_process() # pylint: disable=assignment-from-no-return
except SkipJob as e:
elapsed = datetime.datetime.now() - timeinfo.start_time
- self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
+ self.message(MessageType.SKIPPED, str(e), elapsed=elapsed)
# Alert parent of skip by return code
return _ReturnCode.SKIPPED
@@ -567,15 +561,10 @@ class ChildJob:
if retry_flag and (self._tries <= self._max_retries):
self.message(
- MessageType.FAIL,
- "Try #{} failed, retrying".format(self._tries),
- elapsed=elapsed,
- logfile=filename,
+ MessageType.FAIL, "Try #{} failed, retrying".format(self._tries), elapsed=elapsed,
)
else:
- self.message(
- MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, logfile=filename, sandbox=e.sandbox
- )
+ self.message(MessageType.FAIL, str(e), elapsed=elapsed, detail=e.detail, sandbox=e.sandbox)
# Report the exception to the parent (for internal testing purposes)
self._child_send_error(e)
@@ -593,7 +582,7 @@ class ChildJob:
elapsed = datetime.datetime.now() - timeinfo.start_time
detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
- self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
+ self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail)
# Unhandled exceptions should permenantly fail
return _ReturnCode.PERM_FAIL
@@ -602,7 +591,7 @@ class ChildJob:
self._child_send_result(result)
elapsed = datetime.datetime.now() - timeinfo.start_time
- self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
+ self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed)
# Shutdown needs to stay outside of the above context manager,
# make sure we dont try to handle SIGTERM while the process
@@ -660,35 +649,3 @@ class ChildJob:
def _child_send_result(self, result):
if result is not None:
self._send_message(_MessageType.RESULT, result)
-
- # _child_message_handler()
- #
- # A Context delegate for handling messages, this replaces the
- # frontend's main message handler in the context of a child task
- # and performs local logging to the local log file before sending
- # the message back to the parent process for further propagation.
- # The related element display key is added to the message for
- # widget rendering if not already set for an element childjob.
- #
- # Args:
- # message (Message): The message to log
- # is_silenced (bool) : Whether messages are silenced
- #
- def _child_message_handler(self, message, is_silenced):
-
- message.action_name = self.action_name
-
- # If no key has been set at this point, and the element job has
- # a related key, set it. This is needed for messages going
- # straight to the message handler from the child process.
- if message.element_key is None and self._message_element_key:
- message.element_key = self._message_element_key
-
- # Send to frontend if appropriate
- if is_silenced and (message.message_type not in unconditional_messages):
- return
-
- if message.message_type == MessageType.LOG:
- return
-
- self._send_message(_MessageType.LOG_MESSAGE, message)
diff --git a/tests/testutils/context.py b/tests/testutils/context.py
index 821adef..ab14c1b 100644
--- a/tests/testutils/context.py
+++ b/tests/testutils/context.py
@@ -23,7 +23,7 @@ from buildstream._context import Context
# Handle messages from the pipeline
-def _dummy_message_handler(message, is_silenced):
+def _dummy_message_handler(message):
pass