You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by tv...@apache.org on 2021/02/04 07:13:59 UTC

[buildstream] 08/11: _messenger.py: Extract job-specific actions into another class

This is an automated email from the ASF dual-hosted git repository.

tvb pushed a commit to branch bschubert/no-multiprocessing-bak
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 6ea36b0f65394c54c194e6a6ade6328725979e5a
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Tue Jul 7 19:36:56 2020 +0000

    _messenger.py: Extract job-specific actions into another class
    
    This will help separating concerns and have more explicit control on
    what is happening where
---
 src/buildstream/_messenger.py | 276 +++++++++++++++++++++++++-----------------
 1 file changed, 164 insertions(+), 112 deletions(-)

diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index fc9bd72..e6456ef 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -21,7 +21,7 @@ import os
 import datetime
 import threading
 from contextlib import contextmanager
-from typing import Callable, Generator, Optional
+from typing import Callable, Generator, Optional, TextIO
 
 from . import _signals
 from ._exceptions import BstError
@@ -53,6 +53,156 @@ class MessageHandlerCallback:
         pass
 
 
+class _MessageHandler:
+    def __init__(self, callback: MessageHandlerCallback) -> None:
+        self.log_handle: Optional[TextIO] = None
+        self.log_filename: Optional[str] = None
+
+        self._callback: MessageHandlerCallback = callback
+        self._silence_scope_depth: int = 0
+
+    # message():
+    #
+    # Handle the given message
+    #
+    # Args:
+    #    message: A Message object
+    #
+    def message(self, message: Message) -> None:
+        # If we are recording messages, dump a copy into the open log file.
+        self._record_message(message)
+        self._callback(message, is_silenced=self._silent_messages())
+
+    # recorded_messages()
+    #
+    # Records all messages in a log file while the context manager
+    # is active.
+    #
+    # In addition to automatically writing all messages to the
+    # specified logging file, an open file handle for process stdout
+    # and stderr will be available via the Messenger.get_log_handle() API,
+    # and the full logfile path will be available via the
+    # Messenger.get_log_filename() API.
+    #
+    # Args:
+    #     filename: A logging directory relative filename, the pid and .log
+    #               extension will be automatically appended
+    #
+    #     logdir : The path to the log file directory.
+    #
+    # Yields: The fully qualified log filename
+    #
+    @contextmanager
+    def recorded_messages(self, filename: str, logdir: str) -> Generator[str, None, None]:
+        # We dont allow recursing in this context manager, and
+        # we also do not allow it in the main process.
+        assert self.log_handle is None
+        assert self.log_filename is None
+
+        # Create the fully qualified logfile in the log directory,
+        # appending the pid and .log extension at the end.
+        self.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+        self._silence_scope_depth = 0
+
+        # Ensure the directory exists first
+        directory = os.path.dirname(self.log_filename)
+        os.makedirs(directory, exist_ok=True)
+
+        with open(self.log_filename, "a") as logfile:
+
+            # Write one last line to the log and flush it to disk
+            def flush_log():
+
+                # If the process currently had something happening in the I/O stack
+                # then trying to reenter the I/O stack will fire a runtime error.
+                #
+                # So just try to flush as well as we can at SIGTERM time
+                try:
+                    logfile.write("\n\nForcefully terminated\n")
+                    logfile.flush()
+                except RuntimeError:
+                    os.fsync(logfile.fileno())
+
+            self.log_handle = logfile
+            with _signals.terminator(flush_log):
+                yield self.log_filename
+
+            self.log_handle = None
+            self.log_filename = None
+
+    # silence()
+    #
+    # A context manager to silence messages, this behaves in
+    # the same way as the `silent_nested` argument of the
+    # timed_activity() context manager: all but
+    # _message.unconditional_messages will be silenced.
+    #
+    @contextmanager
+    def silence(self) -> Generator[None, None, None]:
+        self._silence_scope_depth += 1
+        try:
+            yield
+        finally:
+            assert self._silence_scope_depth > 0
+            self._silence_scope_depth -= 1
+
+    # _record_message()
+    #
+    # Records the message if recording is enabled
+    #
+    # Args:
+    #    message (Message): The message to record
+    #
+    def _record_message(self, message: Message) -> None:
+
+        if self.log_handle is None:
+            return
+
+        INDENT = "    "
+        EMPTYTIME = "--:--:--"
+        template = "[{timecode: <8}] {type: <7}"
+
+        # If this message is associated with an element or source plugin, print the
+        # full element name of the instance.
+        element_name = ""
+        if message.element_name:
+            template += " {element_name}"
+            element_name = message.element_name
+
+        template += ": {message}"
+
+        detail = ""
+        if message.detail is not None:
+            template += "\n\n{detail}"
+            detail = message.detail.rstrip("\n")
+            detail = INDENT + INDENT.join(detail.splitlines(True))
+
+        timecode = EMPTYTIME
+        if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+            hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2)
+            minutes, seconds = divmod(remainder, 60)
+            timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+        text = template.format(
+            timecode=timecode,
+            element_name=element_name,
+            type=message.message_type.upper(),
+            message=message.message,
+            detail=detail,
+        )
+
+        # Write to the open log file
+        self.log_handle.write("{}\n".format(text))
+        self.log_handle.flush()
+
+    # _silent_messages():
+    #
+    # Returns: Whether messages are currently being silenced
+    #
+    def _silent_messages(self) -> bool:
+        return self._silence_scope_depth > 0
+
+
 class Messenger:
     def __init__(self) -> None:
         self._state: Optional[State] = None
@@ -62,9 +212,6 @@ class Messenger:
 
         self._locals = threading.local()
         self._locals.message_handler = None
-        self._locals.log_handle = None
-        self._locals.log_filename = None
-        self._locals.silence_scope_depth = 0
 
     # set_message_handler()
     #
@@ -75,7 +222,7 @@ class Messenger:
     #   handler: The handler to call on message
     #
     def set_message_handler(self, handler: MessageHandlerCallback) -> None:
-        self._locals.message_handler = handler
+        self._locals.message_handler = _MessageHandler(handler)
 
     # set_state()
     #
@@ -97,13 +244,6 @@ class Messenger:
     def set_render_status_cb(self, callback: Callable[[], None]) -> None:
         self._render_status_cb = callback
 
-    # _silent_messages():
-    #
-    # Returns: Whether messages are currently being silenced
-    #
-    def _silent_messages(self) -> bool:
-        return self._locals.silence_scope_depth > 0
-
     # message():
     #
     # Proxies a message back to the caller, this is the central
@@ -113,15 +253,7 @@ class Messenger:
     #    message: A Message object
     #
     def message(self, message: Message) -> None:
-        # If we are recording messages, dump a copy into the open log file.
-        self._record_message(message)
-
-        # Send it off to the log handler (can be the frontend,
-        # or it can be the child task which will propagate
-        # to the frontend)
-        assert self._locals.message_handler
-
-        self._locals.message_handler(message, is_silenced=self._silent_messages())
+        self._message_handler.message(message)
 
     # silence()
     #
@@ -140,12 +272,8 @@ class Messenger:
             yield
             return
 
-        self._locals.silence_scope_depth += 1
-        try:
+        with self._message_handler.silence():
             yield
-        finally:
-            assert self._locals.silence_scope_depth > 0
-            self._locals.silence_scope_depth -= 1
 
     # timed_activity()
     #
@@ -275,41 +403,8 @@ class Messenger:
     #
     @contextmanager
     def recorded_messages(self, filename: str, logdir: str) -> Generator[str, None, None]:
-        # We dont allow recursing in this context manager, and
-        # we also do not allow it in the main process.
-        assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None
-        assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None
-
-        # Create the fully qualified logfile in the log directory,
-        # appending the pid and .log extension at the end.
-        self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
-        self._locals.silence_scope_depth = 0
-
-        # Ensure the directory exists first
-        directory = os.path.dirname(self._locals.log_filename)
-        os.makedirs(directory, exist_ok=True)
-
-        with open(self._locals.log_filename, "a") as logfile:
-
-            # Write one last line to the log and flush it to disk
-            def flush_log():
-
-                # If the process currently had something happening in the I/O stack
-                # then trying to reenter the I/O stack will fire a runtime error.
-                #
-                # So just try to flush as well as we can at SIGTERM time
-                try:
-                    logfile.write("\n\nForcefully terminated\n")
-                    logfile.flush()
-                except RuntimeError:
-                    os.fsync(logfile.fileno())
-
-            self._locals.log_handle = logfile
-            with _signals.terminator(flush_log):
-                yield self._locals.log_filename
-
-            self._locals.log_handle = None
-            self._locals.log_filename = None
+        with self._message_handler.recorded_messages(filename, logdir) as file:
+            yield file
 
     # get_log_handle()
     #
@@ -319,8 +414,8 @@ class Messenger:
     #
     # Returns: The active logging file handle, or None
     #
-    def get_log_handle(self) -> str:
-        return self._locals.log_handle
+    def get_log_handle(self) -> Optional[TextIO]:
+        return self._message_handler.log_handle
 
     # get_log_filename()
     #
@@ -331,7 +426,8 @@ class Messenger:
     # Returns: The active logging filename, or None
     #
     def get_log_filename(self) -> str:
-        return self._locals.log_filename
+        assert self._message_handler.log_filename is not None
+        return self._message_handler.log_filename
 
     # timed_suspendable()
     #
@@ -361,54 +457,10 @@ class Messenger:
         with _signals.suspendable(stop_time, resume_time):
             yield timedata
 
-    # _record_message()
-    #
-    # Records the message if recording is enabled
-    #
-    # Args:
-    #    message (Message): The message to record
-    #
-    def _record_message(self, message: Message) -> None:
-
-        if self._locals.log_handle is None:
-            return
-
-        INDENT = "    "
-        EMPTYTIME = "--:--:--"
-        template = "[{timecode: <8}] {type: <7}"
-
-        # If this message is associated with an element or source plugin, print the
-        # full element name of the instance.
-        element_name = ""
-        if message.element_name:
-            template += " {element_name}"
-            element_name = message.element_name
-
-        template += ": {message}"
-
-        detail = ""
-        if message.detail is not None:
-            template += "\n\n{detail}"
-            detail = message.detail.rstrip("\n")
-            detail = INDENT + INDENT.join(detail.splitlines(True))
-
-        timecode = EMPTYTIME
-        if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
-            hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2)
-            minutes, seconds = divmod(remainder, 60)
-            timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
-        text = template.format(
-            timecode=timecode,
-            element_name=element_name,
-            type=message.message_type.upper(),
-            message=message.message,
-            detail=detail,
-        )
-
-        # Write to the open log file
-        self._locals.log_handle.write("{}\n".format(text))
-        self._locals.log_handle.flush()
+    @property
+    def _message_handler(self) -> _MessageHandler:
+        assert self._locals.message_handler is not None, "No message handler has been set in this thread"
+        return self._locals.message_handler
 
     # _render_status()
     #