You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by ro...@apache.org on 2020/12/29 13:41:42 UTC

[buildstream] 10/10: job.py: Pass the results directly between child and parent

This is an automated email from the ASF dual-hosted git repository.

root pushed a commit to branch bschubert/remove-parent-child-pipe
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 1893e3aa2d7fd9e79e9f25f05eaa33b98920db2f
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Wed Jul 8 12:02:28 2020 +0000

    job.py: Pass the results directly between child and parent
    
    And cleanup all the queue-messaging related parts that are not required
    anymore
---
 src/buildstream/_scheduler/jobs/job.py | 162 ++-------------------------------
 1 file changed, 9 insertions(+), 153 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index a80832d..fed3b58 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -23,7 +23,6 @@
 # System imports
 import asyncio
 import datetime
-import multiprocessing
 import threading
 import traceback
 
@@ -61,17 +60,6 @@ class JobStatus(FastEnum):
     SKIPPED = 3
 
 
-# Used to distinguish between status messages and return values
-class _Envelope:
-    def __init__(self, message_type, message):
-        self.message_type = message_type
-        self.message = message
-
-
-class _MessageType(FastEnum):
-    RESULT = 1
-
-
 # Job()
 #
 # The Job object represents a task that will run in parallel to the main
@@ -116,9 +104,6 @@ class Job:
         #
         self._scheduler = scheduler  # The scheduler
         self._messenger = self._scheduler.context.messenger
-        self._pipe_r = None  # The read end of a pipe for message passing
-        self._listening = False  # Whether the parent is currently listening
-        self._suspended = False  # Whether this job is currently suspended
         self._max_retries = max_retries  # Maximum number of automatic retries
         self._result = None  # Return value of child action in the parent
         self._tries = 0  # Try count, for retryable jobs
@@ -146,11 +131,7 @@ class Job:
 
         assert not self._terminated, "Attempted to start process which was already terminated"
 
-        # FIXME: remove this, this is not necessary when using asyncio
-        self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
-
         self._tries += 1
-        self._parent_start_listening()
 
         # FIXME: remove the parent/child separation, it's not needed anymore.
         self._child = self.create_child_job(  # pylint: disable=assignment-from-no-return
@@ -167,8 +148,8 @@ class Job:
         loop = asyncio.get_event_loop()
 
         async def execute():
-            result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
-            await self._parent_child_completed(result)
+            return_code, self._result = await loop.run_in_executor(None, self._child.child_action)
+            await self._parent_child_completed(return_code)
 
         self._task = loop.create_task(execute())
 
@@ -181,9 +162,6 @@ class Job:
     def terminate(self):
         self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
-        # Make sure there is no garbage on the pipe
-        self._parent_stop_listening()
-
         if self._task:
             self._child.terminate()
 
@@ -294,16 +272,6 @@ class Job:
     #                  Local Private Methods              #
     #######################################################
 
-    # _parent_shutdown()
-    #
-    # Shuts down the Job on the parent side by reading any remaining
-    # messages on the message pipe and cleaning up any resources.
-    #
-    def _parent_shutdown(self):
-        # Make sure we've read everything we need and then stop listening
-        self._parent_process_pipe()
-        self._parent_stop_listening()
-
     # _parent_child_completed()
     #
     # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
@@ -312,8 +280,6 @@ class Job:
     #    returncode (int): The return code of the child process
     #
     async def _parent_child_completed(self, returncode):
-        self._parent_shutdown()
-
         try:
             returncode = _ReturnCode(returncode)
         except ValueError:
@@ -353,70 +319,7 @@ class Job:
         self.parent_complete(status, self._result)
         self._scheduler.job_completed(self, status)
 
-        # Force the deletion of the pipe and process objects to try and clean up FDs
-        self._pipe_r.close()
-        self._pipe_r = self._task = None
-
-    # _parent_process_envelope()
-    #
-    # Processes a message Envelope deserialized form the message pipe.
-    #
-    # this will have the side effect of assigning some local state
-    # on the Job in the parent process for later inspection when the
-    # child process completes.
-    #
-    # Args:
-    #    envelope (Envelope): The message envelope
-    #
-    def _parent_process_envelope(self, envelope):
-        if not self._listening:
-            return
-
-        if envelope.message_type is _MessageType.RESULT:
-            assert self._result is None
-            self._result = envelope.message
-        else:
-            assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
-
-    # _parent_process_pipe()
-    #
-    # Reads back message envelopes from the message pipe
-    # in the parent process.
-    #
-    def _parent_process_pipe(self):
-        while self._pipe_r.poll():
-            try:
-                envelope = self._pipe_r.recv()
-            except EOFError:
-                self._parent_stop_listening()
-                break
-            self._parent_process_envelope(envelope)
-
-    # _parent_recv()
-    #
-    # A callback to handle I/O events from the message
-    # pipe file descriptor in the main process message loop
-    #
-    def _parent_recv(self, *args):
-        self._parent_process_pipe()
-
-    # _parent_start_listening()
-    #
-    # Starts listening on the message pipe
-    #
-    def _parent_start_listening(self):
-        if not self._listening:
-            self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
-            self._listening = True
-
-    # _parent_stop_listening()
-    #
-    # Stops listening on the message pipe
-    #
-    def _parent_stop_listening(self):
-        if self._listening:
-            self._scheduler.loop.remove_reader(self._pipe_r.fileno())
-            self._listening = False
+        self._task = None
 
 
 # ChildJob()
@@ -457,7 +360,6 @@ class ChildJob:
         self._message_element_name = message_element_name
         self._message_element_key = message_element_key
 
-        self._pipe_w = None  # The write end of a pipe for message passing
         self._thread_id = None  # Thread in which the child executes its action
         self._should_terminate = False
         self._terminate_lock = threading.Lock()
@@ -511,16 +413,7 @@ class ChildJob:
     #
     # Perform the action in the child process, this calls the action_cb.
     #
-    # Args:
-    #    pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
-    #
-    def child_action(self, pipe_w):
-        # Assign the pipe we passed across the process boundaries
-        #
-        # Set the global message handler in this child
-        # process to forward messages to the parent process
-        self._pipe_w = pipe_w
-
+    def child_action(self):
         # Time, log and and run the action function
         #
         with self._messenger.timed_suspendable() as timeinfo, self._messenger.record_job(
@@ -542,7 +435,7 @@ class ChildJob:
                     self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
 
                     # Alert parent of skip by return code
-                    return _ReturnCode.SKIPPED
+                    return _ReturnCode.SKIPPED, None
                 except BstError as e:
                     elapsed = datetime.datetime.now() - timeinfo.start_time
                     retry_flag = e.temporary
@@ -567,7 +460,7 @@ class ChildJob:
 
                     # Set return code based on whether or not the error was temporary.
                     #
-                    return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
+                    return (_ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL), None
                 except Exception:  # pylint: disable=broad-except
 
                     # If an unhandled (not normalized to BstError) occurs, that's a bug,
@@ -579,59 +472,22 @@ class ChildJob:
 
                     self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail)
                     # Unhandled exceptions should permenantly fail
-                    return _ReturnCode.PERM_FAIL
+                    return _ReturnCode.PERM_FAIL, None
 
                 else:
                     # No exception occurred in the action
-                    self._child_send_result(result)
-
                     elapsed = datetime.datetime.now() - timeinfo.start_time
                     self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed)
 
                     # Shutdown needs to stay outside of the above context manager,
                     # make sure we dont try to handle SIGTERM while the process
                     # is already busy in sys.exit()
-                    return _ReturnCode.OK
+                    return _ReturnCode.OK, result
                 finally:
                     self._thread_id = None
             except TerminateException:
                 self._thread_id = None
-                return _ReturnCode.TERMINATED
-            finally:
-                self._pipe_w.close()
-
-    #######################################################
-    #                  Local Private Methods              #
-    #######################################################
-
-    # _send_message()
-    #
-    # Send data in a message to the parent Job, running in the main process.
-    #
-    # Args:
-    #    message_type (str): The type of message to send.
-    #    message_data (any): A simple object (must be pickle-able, i.e.
-    #                        strings, lists, dicts, numbers, but not Element
-    #                        instances). This is sent to the parent Job.
-    #
-    def _send_message(self, message_type, message_data):
-        self._pipe_w.send(_Envelope(message_type, message_data))
-
-    # _child_send_result()
-    #
-    # Sends the serialized result to the main process through the message pipe
-    #
-    # Args:
-    #    result (any): None, or a simple object (must be pickle-able, i.e.
-    #                  strings, lists, dicts, numbers, but not Element
-    #                  instances).
-    #
-    # Note: If None is passed here, nothing needs to be sent, the
-    #       result member in the parent process will simply remain None.
-    #
-    def _child_send_result(self, result):
-        if result is not None:
-            self._send_message(_MessageType.RESULT, result)
+                return _ReturnCode.TERMINATED, None
 
     def terminate(self):
         if self._should_terminate: