You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 07:13:59 UTC
[buildstream] 08/11: _messenger.py: Extract job-specific actions
into another class
This is an automated email from the ASF dual-hosted git repository.
tvb pushed a commit to branch bschubert/no-multiprocessing-bak
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit 6ea36b0f65394c54c194e6a6ade6328725979e5a
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Tue Jul 7 19:36:56 2020 +0000
_messenger.py: Extract job-specific actions into another class
This will help separating concerns and have more explicit control on
what is happening where
---
src/buildstream/_messenger.py | 276 +++++++++++++++++++++++++-----------------
1 file changed, 164 insertions(+), 112 deletions(-)
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index fc9bd72..e6456ef 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -21,7 +21,7 @@ import os
import datetime
import threading
from contextlib import contextmanager
-from typing import Callable, Generator, Optional
+from typing import Callable, Generator, Optional, TextIO
from . import _signals
from ._exceptions import BstError
@@ -53,6 +53,156 @@ class MessageHandlerCallback:
pass
+class _MessageHandler:
+ def __init__(self, callback: MessageHandlerCallback) -> None:
+ self.log_handle: Optional[TextIO] = None
+ self.log_filename: Optional[str] = None
+
+ self._callback: MessageHandlerCallback = callback
+ self._silence_scope_depth: int = 0
+
+ # message():
+ #
+ # Handle the given message
+ #
+ # Args:
+ # message: A Message object
+ #
+ def message(self, message: Message) -> None:
+ # If we are recording messages, dump a copy into the open log file.
+ self._record_message(message)
+ self._callback(message, is_silenced=self._silent_messages())
+
+ # recorded_messages()
+ #
+ # Records all messages in a log file while the context manager
+ # is active.
+ #
+ # In addition to automatically writing all messages to the
+ # specified logging file, an open file handle for process stdout
+ # and stderr will be available via the Messenger.get_log_handle() API,
+ # and the full logfile path will be available via the
+ # Messenger.get_log_filename() API.
+ #
+ # Args:
+ # filename: A logging directory relative filename, the pid and .log
+ # extension will be automatically appended
+ #
+ # logdir : The path to the log file directory.
+ #
+ # Yields: The fully qualified log filename
+ #
+ @contextmanager
+ def recorded_messages(self, filename: str, logdir: str) -> Generator[str, None, None]:
+ # We dont allow recursing in this context manager, and
+ # we also do not allow it in the main process.
+ assert self.log_handle is None
+ assert self.log_filename is None
+
+ # Create the fully qualified logfile in the log directory,
+ # appending the pid and .log extension at the end.
+ self.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+ self._silence_scope_depth = 0
+
+ # Ensure the directory exists first
+ directory = os.path.dirname(self.log_filename)
+ os.makedirs(directory, exist_ok=True)
+
+ with open(self.log_filename, "a") as logfile:
+
+ # Write one last line to the log and flush it to disk
+ def flush_log():
+
+ # If the process currently had something happening in the I/O stack
+ # then trying to reenter the I/O stack will fire a runtime error.
+ #
+ # So just try to flush as well as we can at SIGTERM time
+ try:
+ logfile.write("\n\nForcefully terminated\n")
+ logfile.flush()
+ except RuntimeError:
+ os.fsync(logfile.fileno())
+
+ self.log_handle = logfile
+ with _signals.terminator(flush_log):
+ yield self.log_filename
+
+ self.log_handle = None
+ self.log_filename = None
+
+ # silence()
+ #
+ # A context manager to silence messages, this behaves in
+ # the same way as the `silent_nested` argument of the
+ # timed_activity() context manager: all but
+ # _message.unconditional_messages will be silenced.
+ #
+ @contextmanager
+ def silence(self) -> Generator[None, None, None]:
+ self._silence_scope_depth += 1
+ try:
+ yield
+ finally:
+ assert self._silence_scope_depth > 0
+ self._silence_scope_depth -= 1
+
+ # _record_message()
+ #
+ # Records the message if recording is enabled
+ #
+ # Args:
+ # message (Message): The message to record
+ #
+ def _record_message(self, message: Message) -> None:
+
+ if self.log_handle is None:
+ return
+
+ INDENT = " "
+ EMPTYTIME = "--:--:--"
+ template = "[{timecode: <8}] {type: <7}"
+
+ # If this message is associated with an element or source plugin, print the
+ # full element name of the instance.
+ element_name = ""
+ if message.element_name:
+ template += " {element_name}"
+ element_name = message.element_name
+
+ template += ": {message}"
+
+ detail = ""
+ if message.detail is not None:
+ template += "\n\n{detail}"
+ detail = message.detail.rstrip("\n")
+ detail = INDENT + INDENT.join(detail.splitlines(True))
+
+ timecode = EMPTYTIME
+ if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+ hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2)
+ minutes, seconds = divmod(remainder, 60)
+ timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+ text = template.format(
+ timecode=timecode,
+ element_name=element_name,
+ type=message.message_type.upper(),
+ message=message.message,
+ detail=detail,
+ )
+
+ # Write to the open log file
+ self.log_handle.write("{}\n".format(text))
+ self.log_handle.flush()
+
+ # _silent_messages():
+ #
+ # Returns: Whether messages are currently being silenced
+ #
+ def _silent_messages(self) -> bool:
+ return self._silence_scope_depth > 0
+
+
class Messenger:
def __init__(self) -> None:
self._state: Optional[State] = None
@@ -62,9 +212,6 @@ class Messenger:
self._locals = threading.local()
self._locals.message_handler = None
- self._locals.log_handle = None
- self._locals.log_filename = None
- self._locals.silence_scope_depth = 0
# set_message_handler()
#
@@ -75,7 +222,7 @@ class Messenger:
# handler: The handler to call on message
#
def set_message_handler(self, handler: MessageHandlerCallback) -> None:
- self._locals.message_handler = handler
+ self._locals.message_handler = _MessageHandler(handler)
# set_state()
#
@@ -97,13 +244,6 @@ class Messenger:
def set_render_status_cb(self, callback: Callable[[], None]) -> None:
self._render_status_cb = callback
- # _silent_messages():
- #
- # Returns: Whether messages are currently being silenced
- #
- def _silent_messages(self) -> bool:
- return self._locals.silence_scope_depth > 0
-
# message():
#
# Proxies a message back to the caller, this is the central
@@ -113,15 +253,7 @@ class Messenger:
# message: A Message object
#
def message(self, message: Message) -> None:
- # If we are recording messages, dump a copy into the open log file.
- self._record_message(message)
-
- # Send it off to the log handler (can be the frontend,
- # or it can be the child task which will propagate
- # to the frontend)
- assert self._locals.message_handler
-
- self._locals.message_handler(message, is_silenced=self._silent_messages())
+ self._message_handler.message(message)
# silence()
#
@@ -140,12 +272,8 @@ class Messenger:
yield
return
- self._locals.silence_scope_depth += 1
- try:
+ with self._message_handler.silence():
yield
- finally:
- assert self._locals.silence_scope_depth > 0
- self._locals.silence_scope_depth -= 1
# timed_activity()
#
@@ -275,41 +403,8 @@ class Messenger:
#
@contextmanager
def recorded_messages(self, filename: str, logdir: str) -> Generator[str, None, None]:
- # We dont allow recursing in this context manager, and
- # we also do not allow it in the main process.
- assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None
- assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None
-
- # Create the fully qualified logfile in the log directory,
- # appending the pid and .log extension at the end.
- self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
- self._locals.silence_scope_depth = 0
-
- # Ensure the directory exists first
- directory = os.path.dirname(self._locals.log_filename)
- os.makedirs(directory, exist_ok=True)
-
- with open(self._locals.log_filename, "a") as logfile:
-
- # Write one last line to the log and flush it to disk
- def flush_log():
-
- # If the process currently had something happening in the I/O stack
- # then trying to reenter the I/O stack will fire a runtime error.
- #
- # So just try to flush as well as we can at SIGTERM time
- try:
- logfile.write("\n\nForcefully terminated\n")
- logfile.flush()
- except RuntimeError:
- os.fsync(logfile.fileno())
-
- self._locals.log_handle = logfile
- with _signals.terminator(flush_log):
- yield self._locals.log_filename
-
- self._locals.log_handle = None
- self._locals.log_filename = None
+ with self._message_handler.recorded_messages(filename, logdir) as file:
+ yield file
# get_log_handle()
#
@@ -319,8 +414,8 @@ class Messenger:
#
# Returns: The active logging file handle, or None
#
- def get_log_handle(self) -> str:
- return self._locals.log_handle
+ def get_log_handle(self) -> Optional[TextIO]:
+ return self._message_handler.log_handle
# get_log_filename()
#
@@ -331,7 +426,8 @@ class Messenger:
# Returns: The active logging filename, or None
#
def get_log_filename(self) -> str:
- return self._locals.log_filename
+ assert self._message_handler.log_filename is not None
+ return self._message_handler.log_filename
# timed_suspendable()
#
@@ -361,54 +457,10 @@ class Messenger:
with _signals.suspendable(stop_time, resume_time):
yield timedata
- # _record_message()
- #
- # Records the message if recording is enabled
- #
- # Args:
- # message (Message): The message to record
- #
- def _record_message(self, message: Message) -> None:
-
- if self._locals.log_handle is None:
- return
-
- INDENT = " "
- EMPTYTIME = "--:--:--"
- template = "[{timecode: <8}] {type: <7}"
-
- # If this message is associated with an element or source plugin, print the
- # full element name of the instance.
- element_name = ""
- if message.element_name:
- template += " {element_name}"
- element_name = message.element_name
-
- template += ": {message}"
-
- detail = ""
- if message.detail is not None:
- template += "\n\n{detail}"
- detail = message.detail.rstrip("\n")
- detail = INDENT + INDENT.join(detail.splitlines(True))
-
- timecode = EMPTYTIME
- if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
- hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2)
- minutes, seconds = divmod(remainder, 60)
- timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
- text = template.format(
- timecode=timecode,
- element_name=element_name,
- type=message.message_type.upper(),
- message=message.message,
- detail=detail,
- )
-
- # Write to the open log file
- self._locals.log_handle.write("{}\n".format(text))
- self._locals.log_handle.flush()
+ @property
+ def _message_handler(self) -> _MessageHandler:
+ assert self._locals.message_handler is not None, "No message handler has been set in this thread"
+ return self._locals.message_handler
# _render_status()
#