You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by be...@apache.org on 2021/01/11 18:15:13 UTC
[buildstream] 02/02: job.py: Completely remove the pipe between
child and parent process
This is an automated email from the ASF dual-hosted git repository.
benschubert pushed a commit to branch bschubert/remove-pipe-job
in repository https://gitbox.apache.org/repos/asf/buildstream.git
commit ad4a9aab6cc7eb35b079aa784ae5e676244d26b4
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Mon Jan 11 12:00:07 2021 +0000
job.py: Completely remove the pipe between child and parent process
This pipe is not needed at all anymore
---
src/buildstream/_scheduler/jobs/job.py | 80 ++--------------------------------
1 file changed, 3 insertions(+), 77 deletions(-)
diff --git a/src/buildstream/_scheduler/jobs/job.py b/src/buildstream/_scheduler/jobs/job.py
index b6d7e6c..4e81931 100644
--- a/src/buildstream/_scheduler/jobs/job.py
+++ b/src/buildstream/_scheduler/jobs/job.py
@@ -24,7 +24,6 @@
import asyncio
import datetime
import itertools
-import multiprocessing
import threading
import traceback
@@ -113,8 +112,6 @@ class Job:
#
self._scheduler = scheduler # The scheduler
self._messenger = self._scheduler.context.messenger
- self._pipe_r = None # The read end of a pipe for message passing
- self._listening = False # Whether the parent is currently listening
self._suspended = False # Whether this job is currently suspended
self._max_retries = max_retries # Maximum number of automatic retries
self._result = None # Return value of child action in the parent
@@ -143,11 +140,7 @@ class Job:
assert not self._terminated, "Attempted to start process which was already terminated"
- # FIXME: remove this, this is not necessary when using asyncio
- self._pipe_r, pipe_w = multiprocessing.Pipe(duplex=False)
-
self._tries += 1
- self._parent_start_listening()
# FIXME: remove the parent/child separation, it's not needed anymore.
self._child = self.create_child_job( # pylint: disable=assignment-from-no-return
@@ -164,7 +157,7 @@ class Job:
loop = asyncio.get_event_loop()
async def execute():
- ret_code, self._result = await loop.run_in_executor(None, self._child.child_action, pipe_w)
+ ret_code, self._result = await loop.run_in_executor(None, self._child.child_action)
await self._parent_child_completed(ret_code)
self._task = loop.create_task(execute())
@@ -178,9 +171,6 @@ class Job:
def terminate(self):
self.message(MessageType.STATUS, "{} terminating".format(self.action_name))
- # Make sure there is no garbage on the pipe
- self._parent_stop_listening()
-
if self._task:
self._child.terminate()
@@ -289,16 +279,6 @@ class Job:
# Local Private Methods #
#######################################################
- # _parent_shutdown()
- #
- # Shuts down the Job on the parent side by reading any remaining
- # messages on the message pipe and cleaning up any resources.
- #
- def _parent_shutdown(self):
- # Make sure we've read everything we need and then stop listening
- self._parent_process_pipe()
- self._parent_stop_listening()
-
# _parent_child_completed()
#
# Called in the main process courtesy of asyncio's ChildWatcher.add_child_handler()
@@ -307,8 +287,6 @@ class Job:
# returncode (int): The return code of the child process
#
async def _parent_child_completed(self, returncode):
- self._parent_shutdown()
-
try:
returncode = _ReturnCode(returncode)
except ValueError:
@@ -347,50 +325,7 @@ class Job:
self.parent_complete(status, self._result)
self._scheduler.job_completed(self, status)
-
- # Force the deletion of the pipe and process objects to try and clean up FDs
- self._pipe_r.close()
- self._pipe_r = self._task = None
-
- # _parent_process_pipe()
- #
- # Reads back message envelopes from the message pipe
- # in the parent process.
- #
- def _parent_process_pipe(self):
- while self._pipe_r.poll():
- try:
- self._pipe_r.recv()
- assert False, "No message should be received anymore"
- except EOFError:
- self._parent_stop_listening()
- break
-
- # _parent_recv()
- #
- # A callback to handle I/O events from the message
- # pipe file descriptor in the main process message loop
- #
- def _parent_recv(self, *args):
- self._parent_process_pipe()
-
- # _parent_start_listening()
- #
- # Starts listening on the message pipe
- #
- def _parent_start_listening(self):
- if not self._listening:
- self._scheduler.loop.add_reader(self._pipe_r.fileno(), self._parent_recv)
- self._listening = True
-
- # _parent_stop_listening()
- #
- # Stops listening on the message pipe
- #
- def _parent_stop_listening(self):
- if self._listening:
- self._scheduler.loop.remove_reader(self._pipe_r.fileno())
- self._listening = False
+ self._task = None
# ChildJob()
@@ -431,7 +366,6 @@ class ChildJob:
self._message_element_name = message_element_name
self._message_element_key = message_element_key
- self._pipe_w = None # The write end of a pipe for message passing
self._thread_id = None # Thread in which the child executes its action
self._should_terminate = False
self._terminate_lock = threading.Lock()
@@ -483,15 +417,9 @@ class ChildJob:
#
# Perform the action in the child process, this calls the action_cb.
#
- # Args:
- # pipe_w (multiprocessing.connection.Connection): The message pipe for IPC
- #
- def child_action(self, pipe_w):
- # Assign the pipe we passed across the process boundaries
- #
+ def child_action(self):
# Set the global message handler in this child
# process to forward messages to the parent process
- self._pipe_w = pipe_w
self._messenger.setup_new_action_context(
self.action_name, self._message_element_name, self._message_element_key
)
@@ -572,8 +500,6 @@ class ChildJob:
except TerminateException:
self._thread_id = None
return _ReturnCode.TERMINATED, None
- finally:
- self._pipe_w.close()
# terminate()
#