You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by no...@apache.org on 2020/12/29 12:25:54 UTC

[buildstream] 11/11: job.py: Completely remove the need for a queue between parent and child jobs

This is an automated email from the ASF dual-hosted git repository.

not-in-ldap pushed a commit to branch bschubert/no-multiprocessing-bak
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit f26ab9fdbbfc297a7ddea7a7b2312573273f4b73
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Wed Jul 8 12:02:28 2020 +0000

    job.py: Completely remove the need for a queue between parent and child jobs
    
    We don't need that distinction anymore
---
 src/buildstream/_scheduler/jobs/job.py | 160 +++------------------------------
 1 file changed, 12 insertions(+), 148 deletions(-)

diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index b253c1b..1ce66f6 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -23,7 +23,6 @@
 # System imports
 import asyncio
 import datetime
-import multiprocessing
 import traceback
 
 # BuildStream toplevel imports
@@ -59,17 +58,6 @@ class JobStatus(FastEnum):
     SKIPPED = 3
 
 
-# Used to distinguish between status messages and return values
-class _Envelope:
-    def __init__(self, message_type, message):
-        self.message_type = message_type
-        self.message = message
-
-
-class _MessageType(FastEnum):
-    RESULT = 3
-
-
 # Job()
 #
 # The Job object represents a task that will run in parallel to the main
@@ -119,9 +107,6 @@ class Job:
         #
         self._scheduler = scheduler  # The scheduler
         self._messenger = self._scheduler.context.messenger
-        self._pipe_r = None  # The read end of a pipe for message passing
-        self._listening = False  # Whether the parent is currently listening
-        self._suspended = False  # Whether this job is currently suspended
         self._max_retries = max_retries  # Maximum number of automatic retries
         self._result = None  # Return value of child action in the parent
         self._tries = 0  # Try count, for retryable jobs
@@ -148,11 +133,7 @@ class Job:
 
         assert not self._terminated, "Attempted to start process which was already terminated"
 
-        # FIXME: remove this, this is not necessary when using asyncio
-        self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
-
         self._tries += 1
-        self._parent_start_listening()
 
         # FIXME: remove the parent/child separation, it's not needed anymore.
         child_job = self.create_child_job(  # pylint: disable=assignment-from-no-return
@@ -170,12 +151,13 @@ class Job:
 
         async def execute():
             try:
-                result = await loop.run_in_executor(None, child_job.child_action, pipe_w)
+                ret_code, self._result = await loop.run_in_executor(None, child_job.child_action)
             except asyncio.CancelledError:
-                result = _ReturnCode.TERMINATED
+                ret_code = _ReturnCode.TERMINATED
             except Exception:  # pylint: disable=broad-except
-                result = _ReturnCode.FAIL
-            await self._parent_child_completed(result)
+                traceback.print_exc()
+                ret_code = _ReturnCode.FAIL
+            await self._parent_child_completed(ret_code)
 
         self._task = loop.create_task(execute())
 
@@ -188,9 +170,6 @@ class Job:
     def terminate(self):
         self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
 
-        # Make sure there is no garbage on the pipe
-        self._parent_stop_listening()
-
         # Terminate the process using multiprocessing API pathway
         if self._task:
             self._task.cancel()
@@ -302,16 +281,6 @@ class Job:
     #                  Local Private Methods              #
     #######################################################
 
-    # _parent_shutdown()
-    #
-    # Shuts down the Job on the parent side by reading any remaining
-    # messages on the message pipe and cleaning up any resources.
-    #
-    def _parent_shutdown(self):
-        # Make sure we've read everything we need and then stop listening
-        self._parent_process_pipe()
-        self._parent_stop_listening()
-
     # _parent_child_completed()
     #
     # Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
@@ -320,8 +289,6 @@ class Job:
     #    returncode (int): The return code of the child process
     #
     async def _parent_child_completed(self, returncode):
-        self._parent_shutdown()
-
         try:
             returncode = _ReturnCode(returncode)
         except ValueError:
@@ -368,69 +335,7 @@ class Job:
         self.parent_complete(status, self._result)
         self._scheduler.job_completed(self, status)
 
-        # Force the deletion of the pipe and process objects to try and clean up FDs
-        self._pipe_r.close()
-        self._pipe_r = self._task = None
-
-    # _parent_process_envelope()
-    #
-    # Processes a message Envelope deserialized form the message pipe.
-    #
-    # this will have the side effect of assigning some local state
-    # on the Job in the parent process for later inspection when the
-    # child process completes.
-    #
-    # Args:
-    #    envelope (Envelope): The message envelope
-    #
-    def _parent_process_envelope(self, envelope):
-        if not self._listening:
-            return
-        elif envelope.message_type is _MessageType.RESULT:
-            assert self._result is None
-            self._result = envelope.message
-        else:
-            assert False, "Unhandled message type '{}': {}".format(envelope.message_type, envelope.message)
-
-    # _parent_process_pipe()
-    #
-    # Reads back message envelopes from the message pipe
-    # in the parent process.
-    #
-    def _parent_process_pipe(self):
-        while self._pipe_r.poll():
-            try:
-                envelope = self._pipe_r.recv()
-            except EOFError:
-                self._parent_stop_listening()
-                break
-            self._parent_process_envelope(envelope)
-
-    # _parent_recv()
-    #
-    # A callback to handle I/O events from the message
-    # pipe file descriptor in the main process message loop
-    #
-    def _parent_recv(self, *args):
-        self._parent_process_pipe()
-
-    # _parent_start_listening()
-    #
-    # Starts listening on the message pipe
-    #
-    def _parent_start_listening(self):
-        if not self._listening:
-            self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
-            self._listening = True
-
-    # _parent_stop_listening()
-    #
-    # Stops listening on the message pipe
-    #
-    def _parent_stop_listening(self):
-        if self._listening:
-            self._scheduler.loop.remove_reader(self._pipe_r.fileno())
-            self._listening = False
+        self._task = None
 
 
 # ChildJob()
@@ -471,8 +376,6 @@ class ChildJob:
         self._message_element_name = message_element_name
         self._message_element_key = message_element_key
 
-        self._pipe_w = None  # The write end of a pipe for message passing
-
     # message():
     #
     # Logs a message, this will be logged in the task's logfile and
@@ -525,12 +428,7 @@ class ChildJob:
     # Args:
     #    pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
     #
-    def child_action(self, pipe_w):
-        # Assign the pipe we passed across the process boundaries
-        #
-        # Set the global message handler in this child
-        # process to forward messages to the parent process
-        self._pipe_w = pipe_w
+    def child_action(self):
         self._messenger.set_message_handler(self._child_message_handler)
 
         # Time, log and and run the action function
@@ -548,7 +446,7 @@ class ChildJob:
                 self.message(MessageType.SKIPPED, str(e), elapsed=elapsed, logfile=filename)
 
                 # Alert parent of skip by return code
-                return _ReturnCode.SKIPPED
+                return _ReturnCode.SKIPPED, None
             except BstError as e:
                 elapsed = datetime.datetime.now() - timeinfo.start_time
                 retry_flag = e.temporary
@@ -570,11 +468,11 @@ class ChildJob:
 
                 # Set return code based on whether or not the error was temporary.
                 #
-                return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL
+                return _ReturnCode.FAIL if retry_flag else _ReturnCode.PERM_FAIL, None
 
             except Exception:  # pylint: disable=broad-except
 
-                # If an unhandled (not normalized to BstError) occurs, that's a bug,
+                # If an unhandled (not normalized to BstError) occurs, that's a `bug`,
                 # send the traceback and formatted exception back to the frontend
                 # and print it to the log file.
                 #
@@ -583,55 +481,21 @@ class ChildJob:
 
                 self.message(MessageType.BUG, self.action_name, elapsed=elapsed, detail=detail, logfile=filename)
                 # Unhandled exceptions should permenantly fail
-                return _ReturnCode.PERM_FAIL
+                return _ReturnCode.PERM_FAIL, None
 
             else:
-                # No exception occurred in the action
-                self._child_send_result(result)
-
                 elapsed = datetime.datetime.now() - timeinfo.start_time
                 self.message(MessageType.SUCCESS, self.action_name, elapsed=elapsed, logfile=filename)
 
                 # Shutdown needs to stay outside of the above context manager,
                 # make sure we dont try to handle SIGTERM while the process
                 # is already busy in sys.exit()
-                return _ReturnCode.OK
-            finally:
-                self._pipe_w.close()
+                return _ReturnCode.OK, result
 
     #######################################################
     #                  Local Private Methods              #
     #######################################################
 
-    # _send_message()
-    #
-    # Send data in a message to the parent Job, running in the main process.
-    #
-    # Args:
-    #    message_type (str): The type of message to send.
-    #    message_data (any): A simple object (must be pickle-able, i.e.
-    #                        strings, lists, dicts, numbers, but not Element
-    #                        instances). This is sent to the parent Job.
-    #
-    def _send_message(self, message_type, message_data):
-        self._pipe_w.send(_Envelope(message_type, message_data))
-
-    # _child_send_result()
-    #
-    # Sends the serialized result to the main process through the message pipe
-    #
-    # Args:
-    #    result (any): None, or a simple object (must be pickle-able, i.e.
-    #                  strings, lists, dicts, numbers, but not Element
-    #                  instances).
-    #
-    # Note: If None is passed here, nothing needs to be sent, the
-    #       result member in the parent process will simply remain None.
-    #
-    def _child_send_result(self, result):
-        if result is not None:
-            self._send_message(_MessageType.RESULT, result)
-
     # _child_message_handler()
     #
     # A Context delegate for handling messages, this replaces the