You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:41:41 UTC

[buildstream] 09/10: _messenger.py: Make the messenger aware of jobs and stop having multiple

This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch bschubert/remove-parent-child-pipe
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 2ce8239955fefbbf468e86a3b0622e6189da8faa
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Wed Jul 8 19:17:05 2020 +0000

    _messenger.py: Make the messenger aware of jobs and stop having multiple
---
 src/buildstream/_frontend/app.py       |   8 +-
 src/buildstream/_messenger.py          | 245 +++++++++++++++++++--------------
 src/buildstream/_scheduler/jobs/job.py |  56 ++------
 tests/testutils/context.py             |   2 +-
 4 files changed, 151 insertions(+), 160 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 5d49e96..88c11c1 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -34,7 +34,7 @@ from .._context import Context
 from .._project import Project
 from .._exceptions import BstError, StreamError, LoadError, AppError
 from ..exceptions import LoadErrorReason
-from .._message import Message, MessageType, unconditional_messages
+from .._message import Message, MessageType
 from .._stream import Stream
 from ..types import _SchedulerErrorAction
 from .. import node
@@ -791,7 +791,7 @@ class App:
     #
     # Handle messages from the pipeline
     #
-    def _message_handler(self, message, is_silenced):
+    def _message_handler(self, message):
 
         # Drop status messages from the UI if not verbose, we'll still see
         # info messages and status messages will still go to the log files.
@@ -802,10 +802,6 @@ class App:
         if message.message_type in [MessageType.FAIL, MessageType.BUG] and message.element_name is not None:
             self._fail_messages[message.element_name] = message
 
-        # Send to frontend if appropriate
-        if is_silenced and (message.message_type not in unconditional_messages):
-            return
-
         # Format the message & cache it
         text = self.logger.render(message)
         self._message_text += text
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index eb3bd51..3220cb1 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -21,11 +21,11 @@ import os
 import datetime
 import threading
 from contextlib import contextmanager
-from typing import Callable, Generator, Optional
+from typing import Callable, Generator, Optional, TextIO
 
 from . import _signals
 from ._exceptions import BstError
-from ._message import Message, MessageType
+from ._message import Message, MessageType, unconditional_messages
 from ._state import State, _Task
 
 
@@ -48,9 +48,86 @@ class _TimeData:
         self.start_time = start_time
 
 
-class MessageHandlerCallback:
-    def __call__(self, message: Message, is_silenced: bool) -> None:
-        pass
+class _JobRecorder:
+    def __init__(self, action_name: str, element_key: str, log_filename: str) -> None:
+        self.action_name = action_name
+        self.element_key = element_key
+        self.log_filename = log_filename
+
+        self.log_handle: Optional[TextIO] = None
+        self.silence_scope_depth = 0
+
+    @contextmanager
+    def enable_recording(self) -> Generator["_JobRecorder", None, None]:
+        # Ensure the directory exists first
+        directory = os.path.dirname(self.log_filename)
+        os.makedirs(directory, exist_ok=True)
+
+        with open(self.log_filename, "a") as logfile:
+
+            # Write one last line to the log and flush it to disk
+            def flush_log():
+
+                # If the process currently had something happening in the I/O stack
+                # then trying to reenter the I/O stack will fire a runtime error.
+                #
+                # So just try to flush as well as we can at SIGTERM time
+                try:
+                    logfile.write("\n\nForcefully terminated\n")
+                    logfile.flush()
+                except RuntimeError:
+                    os.fsync(logfile.fileno())
+
+            self.log_handle = logfile
+
+            with _signals.terminator(flush_log):
+                yield self
+
+    # record_message()
+    #
+    # Records the message if recording is enabled
+    #
+    # Args:
+    #    message (Message): The message to record
+    #
+    def record_message(self, message: Message) -> None:
+        INDENT = "    "
+        EMPTYTIME = "--:--:--"
+        template = "[{timecode: <8}] {type: <7}"
+
+        # If this message is associated with an element or source plugin, print the
+        # full element name of the instance.
+        element_name = ""
+        if message.element_name:
+            template += " {element_name}"
+            element_name = message.element_name
+
+        template += ": {message}"
+
+        detail = ""
+        if message.detail is not None:
+            template += "\n\n{detail}"
+            detail = message.detail.rstrip("\n")
+            detail = INDENT + INDENT.join(detail.splitlines(True))
+
+        timecode = EMPTYTIME
+        if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
+            hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2)
+            minutes, seconds = divmod(remainder, 60)
+            timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
+
+        text = template.format(
+            timecode=timecode,
+            element_name=element_name,
+            type=message.message_type.upper(),
+            message=message.message,
+            detail=detail,
+        )
+
+        # Write to the open log file
+        assert self.log_handle is not None
+        self.log_handle.write("{}\n".format(text))
+        self.log_handle.flush()
 
 
 class Messenger:
@@ -60,11 +137,11 @@ class Messenger:
         self._active_simple_tasks: int = 0
         self._render_status_cb: Optional[Callable[[], None]] = None
 
+        self._message_handler: Optional[Callable[[Message], None]] = None
+        self._global_silence_scope_depth = 0
+
         self._locals = threading.local()
-        self._locals.message_handler = None
-        self._locals.log_handle = None
-        self._locals.log_filename = None
-        self._locals.silence_scope_depth = 0
+        self._locals.job = None
 
     # set_message_handler()
     #
@@ -74,8 +151,8 @@ class Messenger:
     # Args:
     #   handler: The handler to call on message
     #
-    def set_message_handler(self, handler: MessageHandlerCallback) -> None:
-        self._locals.message_handler = handler
+    def set_message_handler(self, handler: Callable[[Message], None]) -> None:
+        self._message_handler = handler
 
     # set_state()
     #
@@ -102,7 +179,9 @@ class Messenger:
     # Returns: Whether messages are currently being silenced
     #
     def _silent_messages(self) -> bool:
-        return self._locals.silence_scope_depth > 0
+        if self._locals.job is not None:
+            return self._locals.job.silence_scope_depth > 0
+        return self._global_silence_scope_depth > 0
 
     # message():
     #
@@ -113,15 +192,30 @@ class Messenger:
     #    message: A Message object
     #
     def message(self, message: Message) -> None:
-        # If we are recording messages, dump a copy into the open log file.
-        self._record_message(message)
+        job = self._locals.job
+
+        if job is not None:
+            message.action_name = job.action_name
+            message.logfile = job.log_filename
 
-        # Send it off to the log handler (can be the frontend,
-        # or it can be the child task which will propagate
-        # to the frontend)
-        assert self._locals.message_handler
+            # If no key has been set at this point, and the element job has
+            # a related key, set it.
+            if message.element_key is None:
+                message.element_key = job.element_key
 
-        self._locals.message_handler(message, is_silenced=self._silent_messages())
+            # Job always record messages
+            self._locals.job.record_message(message)
+
+            # Don't log LOG messages from jobs
+            if message.message_type == MessageType.LOG:
+                return
+
+        # Don't forward if it is currently silent
+        if self._silent_messages() and (message.message_type not in unconditional_messages):
+            return
+
+        assert self._message_handler is not None
+        self._message_handler(message)
 
     # silence()
     #
@@ -140,12 +234,22 @@ class Messenger:
             yield
             return
 
-        self._locals.silence_scope_depth += 1
+        in_job = self._locals.job is not None
+
+        if in_job:
+            self._locals.job.silence_scope_depth += 1
+        else:
+            self._global_silence_scope_depth += 1
+
         try:
             yield
         finally:
-            assert self._locals.silence_scope_depth > 0
-            self._locals.silence_scope_depth -= 1
+            if in_job:
+                assert self._locals.job.silence_scope_depth > 0
+                self._locals.job.silence_scope_depth -= 1
+            else:
+                assert self._global_silence_scope_depth > 0
+                self._global_silence_scope_depth -= 1
 
     # timed_activity()
     #
@@ -254,7 +358,7 @@ class Messenger:
             )
             self.message(message)
 
-    # recorded_messages()
+    # record_job()
     #
     # Records all messages in a log file while the context manager
     # is active.
@@ -274,42 +378,20 @@ class Messenger:
     # Yields: The fully qualified log filename
     #
     @contextmanager
-    def recorded_messages(self, filename: str, logdir: str) -> Generator[str, None, None]:
+    def record_job(
+        self, action_name: str, element_key: str, filename: str, logdir: str
+    ) -> Generator[_JobRecorder, None, None]:
         # We dont allow recursing in this context manager, and
         # we also do not allow it in the main process.
-        assert not hasattr(self._locals, "log_handle") or self._locals.log_handle is None
-        assert not hasattr(self._locals, "log_filename") or self._locals.log_filename is None
+        assert not hasattr(self._locals, "job") or self._locals.job is None
 
-        # Create the fully qualified logfile in the log directory,
-        # appending the pid and .log extension at the end.
-        self._locals.log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
-        self._locals.silence_scope_depth = 0
+        log_filename = os.path.join(logdir, "{}.{}.log".format(filename, os.getpid()))
+        self._locals.job = _JobRecorder(action_name, element_key, log_filename)
 
-        # Ensure the directory exists first
-        directory = os.path.dirname(self._locals.log_filename)
-        os.makedirs(directory, exist_ok=True)
-
-        with open(self._locals.log_filename, "a") as logfile:
+        with self._locals.job.enable_recording() as job:
+            yield job
 
-            # Write one last line to the log and flush it to disk
-            def flush_log():
-
-                # If the process currently had something happening in the I/O stack
-                # then trying to reenter the I/O stack will fire a runtime error.
-                #
-                # So just try to flush as well as we can at SIGTERM time
-                try:
-                    logfile.write("\n\nForcefully terminated\n")
-                    logfile.flush()
-                except RuntimeError:
-                    os.fsync(logfile.fileno())
-
-            self._locals.log_handle = logfile
-            with _signals.terminator(flush_log):
-                yield self._locals.log_filename
-
-            self._locals.log_handle = None
-            self._locals.log_filename = None
+        self._locals.job = None
 
     # get_log_handle()
     #
@@ -320,7 +402,9 @@ class Messenger:
     # Returns: The active logging file handle, or None
     #
     def get_log_handle(self) -> Optional[str]:
-        return self._locals.log_handle
+        if self._locals.job is not None:
+            return self._locals.job.log_handle
+        return None
 
     # get_log_filename()
     #
@@ -331,7 +415,7 @@ class Messenger:
     # Returns: The active logging filename, or None
     #
     def get_log_filename(self) -> str:
-        return self._locals.log_filename
+        return self._locals.job.log_filename
 
     # timed_suspendable()
     #
@@ -361,55 +445,6 @@ class Messenger:
         with _signals.suspendable(stop_time, resume_time):
             yield timedata
 
-    # _record_message()
-    #
-    # Records the message if recording is enabled
-    #
-    # Args:
-    #    message (Message): The message to record
-    #
-    def _record_message(self, message: Message) -> None:
-
-        if self._locals.log_handle is None:
-            return
-
-        INDENT = "    "
-        EMPTYTIME = "--:--:--"
-        template = "[{timecode: <8}] {type: <7}"
-
-        # If this message is associated with an element or source plugin, print the
-        # full element name of the instance.
-        element_name = ""
-        if message.element_name:
-            template += " {element_name}"
-            element_name = message.element_name
-
-        template += ": {message}"
-
-        detail = ""
-        if message.detail is not None:
-            template += "\n\n{detail}"
-            detail = message.detail.rstrip("\n")
-            detail = INDENT + INDENT.join(detail.splitlines(True))
-
-        timecode = EMPTYTIME
-        if message.message_type in (MessageType.SUCCESS, MessageType.FAIL):
-            hours, remainder = divmod(int(message.elapsed.total_seconds()), 60 ** 2)
-            minutes, seconds = divmod(remainder, 60)
-            timecode = "{0:02d}:{1:02d}:{2:02d}".format(hours, minutes, seconds)
-
-        text = template.format(
-            timecode=timecode,
-            element_name=element_name,
-            type=message.message_type.upper(),
-            message=message.message,
-            detail=detail,
-        )
-
-        # Write to the open log file
-        self._locals.log_handle.write("{}\n".format(text))
-        self._locals.log_handle.flush()
-
     # _render_status()
     #
     # Calls the render status callback set in the messenger, but only if a
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 34ff768..a80832d 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -29,7 +29,7 @@ import traceback
 
 # BuildStream toplevel imports
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import Message, MessageType
 from ...types import FastEnum
 from ._job import abort_thread
 from ..._signals import TerminateException
@@ -69,8 +69,7 @@ class _Envelope:
 
 
 class _MessageType(FastEnum):
-    LOG_MESSAGE = 1
-    RESULT = 2
+    RESULT = 1
 
 
 # Job()
@@ -373,11 +372,7 @@ class Job:
         if not self._listening:
             return
 
-        if envelope.message_type is _MessageType.LOG_MESSAGE:
-            # Propagate received messages from children
-            # back through the context.
-            self._messenger.message(envelope.message)
-        elif envelope.message_type is _MessageType.RESULT:
+        if envelope.message_type is _MessageType.RESULT:
             assert self._result is None
             self._result = envelope.message
         else:
@@ -525,15 +520,14 @@ class ChildJob:
         # Set the global message handler in this child
         # process to forward messages to the parent process
         self._pipe_w = pipe_w
-        self._messenger.set_message_handler(self._child_message_handler)
 
         # Time, log and and run the action function
         #
-        with self._messenger.timed_suspendable() as timeinfo, self._messenger.recorded_messages(
-            self._logfile, self._logdir
+        with self._messenger.timed_suspendable() as timeinfo, self._messenger.record_job(
+            self.action_name, self._message_element_key, self._logfile, self._logdir
         ) as filename:
             try:
-                self.message(MessageType.START, self.action_name, logfile=filename)
+                self.message(MessageType.START, self.action_name)
 
                 with self._terminate_lock:
                     self._thread_id = threading.current_thread().ident
@@ -558,7 +552,6 @@ class ChildJob:
                             MessageType.FAIL,
                             "Try #{} failed, retrying".format(self._tries),
                             elapsed=elapsed,
-                            logfile=filename,
                         )
                     else:
                         self.message(
@@ -566,7 +559,6 @@ class ChildJob:
                             str(e),
                             elapsed=elapsed,
                             detail=e.detail,
-                            logfile=filename,
                             sandbox=e.sandbox,
                         )
 
@@ -585,7 +577,7 @@ class ChildJob:
                     elapsed = datetime.datetime.now() - timeinfo.start_time
                     detail = "An unhandled exception occured:\n\n{}".format(traceback.format_exc())
 
-                    self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
+                    self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail)
                     # Unhandled exceptions should permenantly fail
                     return _ReturnCode.PERM_FAIL
 
@@ -594,7 +586,7 @@ class ChildJob:
                     self._child_send_result(result)
 
                     elapsed = datetime.datetime.now() - timeinfo.start_time
-                    self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
+                    self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed)
 
                     # Shutdown needs to stay outside of the above context manager,
                     # make sure we dont try to handle SIGTERM while the process
@@ -641,38 +633,6 @@ class ChildJob:
         if result is not None:
             self._send_message(_MessageType.RESULT, result)
 
-    # _child_message_handler()
-    #
-    # A Context delegate for handling messages, this replaces the
-    # frontend's main message handler in the context of a child task
-    # and performs local logging to the local log file before sending
-    # the message back to the parent process for further propagation.
-    # The related element display key is added to the message for
-    # widget rendering if not already set for an element childjob.
-    #
-    # Args:
-    #    message     (Message): The message to log
-    #    is_silenced (bool)   : Whether messages are silenced
-    #
-    def _child_message_handler(self, message, is_silenced):
-
-        message.action_name = self.action_name
-
-        # If no key has been set at this point, and the element job has
-        # a related key, set it. This is needed for messages going
-        # straight to the message handler from the child process.
-        if message.element_key is None and self._message_element_key:
-            message.element_key = self._message_element_key
-
-        # Send to frontend if appropriate
-        if is_silenced and (message.message_type not in unconditional_messages):
-            return
-
-        if message.message_type == MessageType.LOG:
-            return
-
-        self._send_message(_MessageType.LOG_MESSAGE, message)
-
     def terminate(self):
         if self._should_terminate:
             return
diff --git a/tests/testutils/context.py b/tests/testutils/context.py
index 821adef..ab14c1b 100644
--- a/tests/testutils/context.py
+++ b/tests/testutils/context.py
@@ -23,7 +23,7 @@ from buildstream._context import Context
 
 
 # Handle messages from the pipeline
-def _dummy_message_handler(message, is_silenced):
+def _dummy_message_handler(message):
     pass