You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by be...@apache.org on 2021/01/12 10:13:25 UTC

[buildstream] branch bschubert/remove-pipe-job updated (ad4a9aa -> 2149c97)

This is an automated email from the ASF dual-hosted git repository.

benschubert pushed a change to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git.


 discard ad4a9aa  job.py: Completely remove the pipe between child and parent process
 discard 1dc2ccd  job.py: Stop using the queue to send data between the child and parent
     new 01b4777  job.py: Stop using the queue to send data between the child and parent
     new 2149c97  job.py: Completely remove the pipe between child and parent process

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (ad4a9aa)
            \
             N -- N -- N   refs/heads/bschubert/remove-pipe-job (2149c97)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/buildstream/_messenger.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)


[buildstream] 02/02: job.py: Completely remove the pipe between child and parent process

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benschubert pushed a commit to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 2149c97314dc012d437c71a33f59c52391572227
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Mon Jan 11 12:00:07 2021 +0000

    job.py: Completely remove the pipe between child and parent process
    
    This pipe is not needed at all anymore
---
 src/buildstream/_scheduler/jobs/job.py | 80 ++--------------------------------
 1 file changed, 3 insertions(+), 77 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index b6d7e6c..4e81931 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -24,7 +24,6 @@
 import asyncio
 import datetime
 import itertools
-import multiprocessing
 import threading
 import traceback
 
@@ -113,8 +112,6 @@ class Job:
         #
         self._scheduler = scheduler  # The scheduler
         self._messenger = self._scheduler.context.messenger
-        self._pipe_r = None  # The read end of a pipe for message passing
-        self._listening = False  # Whether the parent is currently listening
         self._suspended = False  # Whether this job is currently suspended
         self._max_retries = max_retries  # Maximum number of automatic retries
         self._result = None  # Return value of child action in the parent
@@ -143,11 +140,7 @@ class Job:
 
         assert not self._terminated, "Attempted to start process which was already terminated"
 
-        # FIXME: remove this, this is not necessary when using asyncio
-        self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
-
         self._tries += 1
-        self._parent_start_listening()
 
         # FIXME: remove the parent/child separation, it's not needed anymore.
         self._child = self.create_child_job(  # pylint: disable=assignment-from-no-return
@@ -164,7 +157,7 @@ class Job:
         loop = asyncio.get_event_loop()
 
         async def execute():
-            ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
+            ret_code, self._result = await loop.run_in_executor(None, self._child.child_action)
             await self._parent_child_completed(ret_code)
 
         self._task = loop.create_task(execute())
@@ -178,9 +171,6 @@ class Job:
     def terminate(self):
         self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
-        # Make sure there is no garbage on the pipe
-        self._parent_stop_listening()
-
         if self._task:
             self._child.terminate()
 
@@ -289,16 +279,6 @@ class Job:
     #                  Local Private Methods              #
     #######################################################
 
-    # _parent_shutdown()
-    #
-    # Shuts down the Job on the parent side by reading any remaining
-    # messages on the message pipe and cleaning up any resources.
-    #
-    def _parent_shutdown(self):
-        # Make sure we've read everything we need and then stop listening
-        self._parent_process_pipe()
-        self._parent_stop_listening()
-
     # _parent_child_completed()
     #
     # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
@@ -307,8 +287,6 @@ class Job:
     #    returncode (int): The return code of the child process
     #
     async def _parent_child_completed(self, returncode):
-        self._parent_shutdown()
-
         try:
             returncode = _ReturnCode(returncode)
         except ValueError:
@@ -347,50 +325,7 @@ class Job:
 
         self.parent_complete(status, self._result)
         self._scheduler.job_completed(self, status)
-
-        # Force the deletion of the pipe and process objects to try and clean up FDs
-        self._pipe_r.close()
-        self._pipe_r = self._task = None
-
-    # _parent_process_pipe()
-    #
-    # Reads back message envelopes from the message pipe
-    # in the parent process.
-    #
-    def _parent_process_pipe(self):
-        while self._pipe_r.poll():
-            try:
-                self._pipe_r.recv()
-                assert False, "No message should be received anymore"
-            except EOFError:
-                self._parent_stop_listening()
-                break
-
-    # _parent_recv()
-    #
-    # A callback to handle I/O events from the message
-    # pipe file descriptor in the main process message loop
-    #
-    def _parent_recv(self, *args):
-        self._parent_process_pipe()
-
-    # _parent_start_listening()
-    #
-    # Starts listening on the message pipe
-    #
-    def _parent_start_listening(self):
-        if not self._listening:
-            self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
-            self._listening = True
-
-    # _parent_stop_listening()
-    #
-    # Stops listening on the message pipe
-    #
-    def _parent_stop_listening(self):
-        if self._listening:
-            self._scheduler.loop.remove_reader(self._pipe_r.fileno())
-            self._listening = False
+        self._task = None
 
 
 # ChildJob()
@@ -431,7 +366,6 @@ class ChildJob:
         self._message_element_name = message_element_name
         self._message_element_key = message_element_key
 
-        self._pipe_w = None  # The write end of a pipe for message passing
         self._thread_id = None  # Thread in which the child executes its action
         self._should_terminate = False
         self._terminate_lock = threading.Lock()
@@ -483,15 +417,9 @@ class ChildJob:
     #
     # Perform the action in the child process, this calls the action_cb.
     #
-    # Args:
-    #    pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
-    #
-    def child_action(self, pipe_w):
-        # Assign the pipe we passed across the process boundaries
-        #
+    def child_action(self):
         # Set the global message handler in this child
         # process to forward messages to the parent process
-        self._pipe_w = pipe_w
         self._messenger.setup_new_action_context(
             self.action_name, self._message_element_name, self._message_element_key
         )
@@ -572,8 +500,6 @@ class ChildJob:
             except TerminateException:
                 self._thread_id = None
                 return _ReturnCode.TERMINATED, None
-            finally:
-                self._pipe_w.close()
 
     # terminate()
     #


[buildstream] 01/02: job.py: Stop using the queue to send data between the child and parent

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benschubert pushed a commit to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 01b47778a5139a81f00498d994132434f474f1d4
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Mon Jan 11 11:48:35 2021 +0000

    job.py: Stop using the queue to send data between the child and parent
    
    This removes the need to have all messages processed in the master
    thread, and instead allows them to be done in any thread.
    
    * _messenger.py:
    
      - Store optional job information in the thread local
        storage and expand the message with it if it is present.
    
      - Make the message handler something global and remove the need to
        have a thread-specific one.
    
      - Have message filter out silenced and LOG messages from jobs
    
    * job.py: Remove the job-specific message handler
---
 src/buildstream/_messenger.py          | 56 ++++++++++++++++++++++++++--------
 src/buildstream/_scheduler/jobs/job.py | 45 ++++-----------------------
 2 files changed, 49 insertions(+), 52 deletions(-)

diff --git a/src/buildstream/_messenger.py b/src/buildstream/_messenger.py
index 01a8cfd..edb79ec 100644
--- a/src/buildstream/_messenger.py
+++ b/src/buildstream/_messenger.py
@@ -25,7 +25,7 @@ from typing import Optional, Callable, Iterator, TextIO
 
 from . import _signals
 from ._exceptions import BstError
-from ._message import Message, MessageType
+from ._message import Message, MessageType, unconditional_messages
 from ._state import State, Task
 
 
@@ -48,6 +48,13 @@ class _TimeData:
         self.start_time: datetime.datetime = start_time
 
 
+class _JobInfo:
+    def __init__(self, action_name: str, element_name: str, element_key: str) -> None:
+        self.action_name = action_name
+        self.element_name = element_name
+        self.element_key = element_key
+
+
 # _MessengerLocal
 #
 # Thread local storage for the messenger
@@ -56,13 +63,6 @@ class _MessengerLocal(threading.local):
     def __init__(self) -> None:
         super().__init__()
 
-        # The callback to call when propagating messages
-        #
-        # FIXME: The message handler is currently not strongly typed,
-        #        as it uses a kwarg, we cannot declare it with Callable.
-        #        We can use `Protocol` to strongly type this with python >= 3.8
-        self.message_handler = None
-
         # The open file handle for this task
         self.log_handle: Optional[TextIO] = None
 
@@ -72,6 +72,9 @@ class _MessengerLocal(threading.local):
         # Level of silent messages depth in this task
         self.silence_scope_depth: int = 0
 
+        # Job
+        self.job: Optional[_JobInfo] = None
+
 
 # Messenger()
 #
@@ -97,8 +100,16 @@ class Messenger:
         # Thread local storage
         self._locals: _MessengerLocal = _MessengerLocal()
 
-    def setup_new_action_context(self) -> None:
+        # The callback to call when propagating messages
+        #
+        # FIXME: The message handler is currently not strongly typed,
+        #        as it uses a kwarg, we cannot declare it with Callable.
+        #        We can use `Protocol` to strongly type this with python >= 3.8
+        self._message_handler = None
+
+    def setup_new_action_context(self, action_name: str, element_name: str, element_key: str) -> None:
         self._locals.silence_scope_depth = 0
+        self._locals.job = _JobInfo(action_name, element_name, element_key)
 
     # set_message_handler()
     #
@@ -106,7 +117,7 @@ class Messenger:
     # the messenger.
     #
     def set_message_handler(self, handler) -> None:
-        self._locals.message_handler = handler
+        self._message_handler = handler
 
     # set_state()
     #
@@ -140,12 +151,31 @@ class Messenger:
         # If we are recording messages, dump a copy into the open log file.
         self._record_message(message)
 
+        # Always add the log filename automatically
+        message.logfile = self._locals.log_filename
+
+        is_silenced = self._silent_messages()
+        job = self._locals.job
+
+        if job is not None:
+            # Automatically add message information from the job context
+            message.action_name = job.action_name
+            message.task_element_name = job.element_name
+            message.task_element_key = job.element_key
+
+            # Don't forward LOG messages from jobs
+            if message.message_type == MessageType.LOG:
+                return
+
+            # Don't forward JOB messages if they are currently silent
+            if is_silenced and (message.message_type not in unconditional_messages):
+                return
+
         # Send it off to the log handler (can be the frontend,
         # or it can be the child task which will propagate
         # to the frontend)
-        assert self._locals.message_handler
-
-        self._locals.message_handler(message, is_silenced=self._silent_messages())
+        assert self._message_handler
+        self._message_handler(message, is_silenced=is_silenced)
 
     # status():
     #
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index 227d3a2..b6d7e6c 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -31,7 +31,7 @@ import traceback
 # BuildStream toplevel imports
 from ... import utils
 from ..._exceptions import ImplError, BstError, set_last_task_error, SkipJob
-from ..._message import Message, MessageType, unconditional_messages
+from ..._message import Message, MessageType
 from ...types import FastEnum
 from ._job import terminate_thread
 from ..._signals import TerminateException
@@ -360,13 +360,12 @@ class Job:
     def _parent_process_pipe(self):
         while self._pipe_r.poll():
             try:
-                message = self._pipe_r.recv()
+                self._pipe_r.recv()
+                assert False, "No message should be received anymore"
             except EOFError:
                 self._parent_stop_listening()
                 break
 
-            self._messenger.message(message)
-
     # _parent_recv()
     #
     # A callback to handle I/O events from the message
@@ -493,8 +492,9 @@ class ChildJob:
         # Set the global message handler in this child
         # process to forward messages to the parent process
         self._pipe_w = pipe_w
-        self._messenger.setup_new_action_context()
-        self._messenger.set_message_handler(self._child_message_handler)
+        self._messenger.setup_new_action_context(
+            self.action_name, self._message_element_name, self._message_element_key
+        )
 
         # Time, log and and run the action function
         #
@@ -593,36 +593,3 @@ class ChildJob:
                 return
 
         terminate_thread(self._thread_id)
-
-    #######################################################
-    #                  Local Private Methods              #
-    #######################################################
-
-    # _child_message_handler()
-    #
-    # A Context delegate for handling messages, this replaces the
-    # frontend's main message handler in the context of a child task
-    # and performs local logging to the local log file before sending
-    # the message back to the parent process for further propagation.
-    # The related element display key is added to the message for
-    # widget rendering if not already set for an element childjob.
-    #
-    # Args:
-    #    message     (Message): The message to log
-    #    is_silenced (bool)   : Whether messages are silenced
-    #
-    def _child_message_handler(self, message, is_silenced):
-
-        message.action_name = self.action_name
-        message.task_element_name = self._message_element_name
-        message.task_element_key = self._message_element_key
-
-        # Send to frontend if appropriate
-        if is_silenced and (message.message_type not in unconditional_messages):
-            return
-
-        # Don't bother propagating these to the frontend
-        if message.message_type == MessageType.LOG:
-            return
-
-        self._pipe_w.send(message)