You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:25:52 UTC

[buildstream] 09/11: _messenger.py: Introduce a 'root' message handler

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch bschubert/no-multiprocessing-bak
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 82197ded517fd2a37370de58d0c65043cb33084b
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Tue Jul 7 22:37:27 2020 +0100

    _messenger.py: Introduce a 'root' message handler
    
    This is to be set in the main process and allows jobs to explicitely
    send a message back to the main handler
---
 src/buildstream/_frontend/app.py       |  2 +-
 src/buildstream/_messenger.py          | 25 ++++++++++++++++++++++---
 src/buildstream/_scheduler/jobs/job.py |  7 +------
 tests/testutils/context.py             |  2 +-
 4 files changed, 25 insertions(+), 11 deletions(-)

diff --git a/src/buildstream/_frontend/app.py b/src/buildstream/_frontend/app.py
index 3160e8b..05d2491 100644
--- a/src/buildstream/_frontend/app.py
+++ b/src/buildstream/_frontend/app.py
@@ -240,7 +240,7 @@ class App:
             )
 
             # Propagate pipeline feedback to the user
-            self.context.messenger.set_message_handler(self._message_handler)
+            self.context.messenger.set_root_message_handler(self._message_handler)
 
             # Check if throttling frontend updates to tick rate
             self._cache_messages = self.context.log_throttle_updates
diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index e6456ef..f93e0ad 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -210,6 +210,8 @@ class Messenger:
         self._active_simple_tasks: int = 0
         self._render_status_cb: Optional[Callable[[], None]] = None
 
+        self._root_message_handler: Optional[_MessageHandler] = None
+
         self._locals = threading.local()
         self._locals.message_handler = None
 
@@ -224,6 +226,18 @@ class Messenger:
     def set_message_handler(self, handler: MessageHandlerCallback) -> None:
         self._locals.message_handler = _MessageHandler(handler)
 
+    # set_root_message_handler()
+    #
+    # Sets the handler for any status messages propagated through
+    # the context.
+    #
+    # Args:
+    #   handler: The handler to call on message
+    #
+    def set_root_message_handler(self, handler: MessageHandlerCallback) -> None:
+        self._root_message_handler = _MessageHandler(handler)
+        self._locals.message_handler = self._root_message_handler
+
     # set_state()
     #
     # Sets the State object within the Messenger
@@ -251,9 +265,14 @@ class Messenger:
     #
     # Args:
     #    message: A Message object
-    #
-    def message(self, message: Message) -> None:
-        self._message_handler.message(message)
+    #    use_root_handler: Whether to use the root handler or the thread local one
+    #
+    def message(self, message: Message, use_root_handler: bool = False) -> None:
+        if use_root_handler:
+            assert self._root_message_handler is not None
+            self._root_message_handler.message(message)
+        else:
+            self._message_handler.message(message)
 
     # silence()
     #
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index f331d3f..adff992 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -67,7 +67,6 @@ class _Envelope:
 
 
 class _MessageType(FastEnum):
-    LOG_MESSAGE = 1
     ERROR = 2
     RESULT = 3
 
@@ -389,10 +388,6 @@ class Job:
         if not self._listening:
             return
 
-        if envelope.message_type is _MessageType.LOG_MESSAGE:
-            # Propagate received messages from children
-            # back through the context.
-            self._messenger.message(envelope.message)
         elif envelope.message_type is _MessageType.ERROR:
             # For regression tests only, save the last error domain / reason
             # reported from a child task in the main process, this global state
@@ -691,4 +686,4 @@ class ChildJob:
         if message.message_type == MessageType.LOG:
             return
 
-        self._send_message(_MessageType.LOG_MESSAGE, message)
+        self._messenger.message(message, use_root_handler=True)
diff --git a/tests/testutils/context.py b/tests/testutils/context.py
index 821adef..6850c00 100644
--- a/tests/testutils/context.py
+++ b/tests/testutils/context.py
@@ -74,7 +74,7 @@ def dummy_context(*, config=None):
 
         context.load(config=config)
 
-        context.messenger.set_message_handler(_dummy_message_handler)
+        context.messenger.set_root_message_handler(_dummy_message_handler)
         context.messenger.simple_task = MethodType(_get_dummy_task, context.messenger)
 
         yield context