You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by be...@apache.org on 2021/01/11 18:15:13 UTC

[buildstream] 02/02: job.py: Completely remove the pipe between child and parent process

This is an automated email from the ASF dual-hosted git repository.

benschubert pushed a commit to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit ad4a9aab6cc7eb35b079aa784ae5e676244d26b4
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Mon Jan 11 12:00:07 2021 +0000

    job.py: Completely remove the pipe between child and parent process
    
    This pipe is not needed at all anymore
---
 src/buildstream/_scheduler/jobs/job.py | 80 ++--------------------------------
 1 file changed, 3 insertions(+), 77 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index b6d7e6c..4e81931 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -24,7 +24,6 @@
 import asyncio
 import datetime
 import itertools
-import multiprocessing
 import threading
 import traceback
 
@@ -113,8 +112,6 @@ class Job:
         #
         self._scheduler = scheduler  # The scheduler
         self._messenger = self._scheduler.context.messenger
-        self._pipe_r = None  # The read end of a pipe for message passing
-        self._listening = False  # Whether the parent is currently listening
         self._suspended = False  # Whether this job is currently suspended
         self._max_retries = max_retries  # Maximum number of automatic retries
         self._result = None  # Return value of child action in the parent
@@ -143,11 +140,7 @@ class Job:
 
         assert not self._terminated, "Attempted to start process which was already terminated"
 
-        # FIXME: remove this, this is not necessary when using asyncio
-        self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
-
         self._tries += 1
-        self._parent_start_listening()
 
         # FIXME: remove the parent/child separation, it's not needed anymore.
         self._child = self.create_child_job(  # pylint: disable=assignment-from-no-return
@@ -164,7 +157,7 @@ class Job:
         loop = asyncio.get_event_loop()
 
         async def execute():
-            ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
+            ret_code, self._result = await loop.run_in_executor(None, self._child.child_action)
             await self._parent_child_completed(ret_code)
 
         self._task = loop.create_task(execute())
@@ -178,9 +171,6 @@ class Job:
     def terminate(self):
         self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
-        # Make sure there is no garbage on the pipe
-        self._parent_stop_listening()
-
         if self._task:
             self._child.terminate()
 
@@ -289,16 +279,6 @@ class Job:
     #                  Local Private Methods              #
     #######################################################
 
-    # _parent_shutdown()
-    #
-    # Shuts down the Job on the parent side by reading any remaining
-    # messages on the message pipe and cleaning up any resources.
-    #
-    def _parent_shutdown(self):
-        # Make sure we've read everything we need and then stop listening
-        self._parent_process_pipe()
-        self._parent_stop_listening()
-
     # _parent_child_completed()
     #
     # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
@@ -307,8 +287,6 @@ class Job:
     #    returncode (int): The return code of the child process
     #
     async def _parent_child_completed(self, returncode):
-        self._parent_shutdown()
-
         try:
             returncode = _ReturnCode(returncode)
         except ValueError:
@@ -347,50 +325,7 @@ class Job:
 
         self.parent_complete(status, self._result)
         self._scheduler.job_completed(self, status)
-
-        # Force the deletion of the pipe and process objects to try and clean up FDs
-        self._pipe_r.close()
-        self._pipe_r = self._task = None
-
-    # _parent_process_pipe()
-    #
-    # Reads back message envelopes from the message pipe
-    # in the parent process.
-    #
-    def _parent_process_pipe(self):
-        while self._pipe_r.poll():
-            try:
-                self._pipe_r.recv()
-                assert False, "No message should be received anymore"
-            except EOFError:
-                self._parent_stop_listening()
-                break
-
-    # _parent_recv()
-    #
-    # A callback to handle I/O events from the message
-    # pipe file descriptor in the main process message loop
-    #
-    def _parent_recv(self, *args):
-        self._parent_process_pipe()
-
-    # _parent_start_listening()
-    #
-    # Starts listening on the message pipe
-    #
-    def _parent_start_listening(self):
-        if not self._listening:
-            self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
-            self._listening = True
-
-    # _parent_stop_listening()
-    #
-    # Stops listening on the message pipe
-    #
-    def _parent_stop_listening(self):
-        if self._listening:
-            self._scheduler.loop.remove_reader(self._pipe_r.fileno())
-            self._listening = False
+        self._task = None
 
 
 # ChildJob()
@@ -431,7 +366,6 @@ class ChildJob:
         self._message_element_name = message_element_name
         self._message_element_key = message_element_key
 
-        self._pipe_w = None  # The write end of a pipe for message passing
         self._thread_id = None  # Thread in which the child executes its action
         self._should_terminate = False
         self._terminate_lock = threading.Lock()
@@ -483,15 +417,9 @@ class ChildJob:
     #
     # Perform the action in the child process, this calls the action_cb.
     #
-    # Args:
-    #    pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
-    #
-    def child_action(self, pipe_w):
-        # Assign the pipe we passed across the process boundaries
-        #
+    def child_action(self):
         # Set the global message handler in this child
         # process to forward messages to the parent process
-        self._pipe_w = pipe_w
         self._messenger.setup_new_action_context(
             self.action_name, self._message_element_name, self._message_element_key
         )
@@ -572,8 +500,6 @@ class ChildJob:
             except TerminateException:
                 self._thread_id = None
                 return _ReturnCode.TERMINATED, None
-            finally:
-                self._pipe_w.close()
 
     # terminate()
     #