You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@buildstream.apache.org by gi...@apache.org on 2020/12/29 13:18:25 UTC

[buildstream] 13/13: WIP

This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch bschubert/no-multiprocessing-full
in repository https://gitbox.apache.org/repos/asf/buildstream.git

commit 5b948b8d96028c7850ec2bdaa19aff3dde09e790
Author: Benjamin Schubert <co...@benschubert.me>
AuthorDate: Fri Jul 10 07:55:03 2020 +0000

    WIP
---
 src/buildstream/plugin.py | 79 ++++++++++++++++++++++++++++++++++++++++++-----
 1 file changed, 72 insertions(+), 7 deletions(-)

diff --git a/src/buildstream/plugin.py b/src/buildstream/plugin.py
index 0ed6d7d..467c955 100644
--- a/src/buildstream/plugin.py
+++ b/src/buildstream/plugin.py
@@ -112,13 +112,14 @@ Class Reference
 import itertools
 import multiprocessing
 import os
+import signal
 import subprocess
 import sys
 from contextlib import contextmanager
 from typing import Callable, Generator, Optional, Tuple, TypeVar, TYPE_CHECKING
 from weakref import WeakValueDictionary
 
-from . import utils
+from . import utils, _signals
 from ._exceptions import PluginError, ImplError
 from ._message import Message, MessageType
 from .node import MappingNode, ProvenanceInformation
@@ -137,6 +138,18 @@ T2 = TypeVar("T2")
 
 
 def _background_job_wrapper(queue: multiprocessing.Queue, target: Callable[[T1], T2], args: T1) -> None:
+    # This avoids some SIGTSTP signals from grandchildren
+    # getting propagated up to the master process
+    os.setsid()
+
+    # First set back to the default signal handlers for the signals
+    # we handle, and then clear their blocked state.
+    #
+    signal_list = [signal.SIGTSTP, signal.SIGTERM]
+    for sig in signal_list:
+        signal.signal(sig, signal.SIG_DFL)
+    signal.pthread_sigmask(signal.SIG_UNBLOCK, signal_list)
+
     queue.put(target(*args))
 
 
@@ -520,15 +533,67 @@ class Plugin:
         ):
             queue = self.__multiprocessing_context.Queue()
 
-            proc = self.__multiprocessing_context.Process(target=_background_job_wrapper, args=(queue, target, args))
-            proc.start()
-
-            result = queue.get()
-            proc.join()
+            process = None
+
+            from .utils import _kill_process_tree
+            import psutil
+
+            # Handle termination, suspend and resume
+            def kill_proc():
+                if not process:
+                    return
+
+                proc = psutil.Process(process.pid)
+
+                # Some callers know that their subprocess can be
+                # gracefully terminated, make an attempt first
+                proc.terminate()
+
+                try:
+                    proc.wait(5)
+                except psutil.TimeoutExpired:
+                    # Did not terminate within the timeout: murder
+                    _kill_process_tree(proc.pid)
+
+                else:
+                    # FIXME: This is a brutal but reliable approach
+                    #
+                    # Other variations I've tried which try SIGTERM first
+                    # and then wait for child processes to exit gracefully
+                    # have not reliably cleaned up process trees and have
+                    # left orphaned git or ssh processes alive.
+                    #
+                    # This cleans up the subprocesses reliably but may
+                    # cause side effects such as possibly leaving stale
+                    # locks behind. Hopefully this should not be an issue
+                    # as long as any child processes only interact with
+                    # the temp directories which we control and cleanup
+                    # ourselves.
+                    #
+                    _kill_process_tree(proc.pid)
+
+            def suspend_proc():
+                if process:
+                    group_id = os.getpgid(process.pid)
+                    os.killpg(group_id, signal.SIGSTOP)
+
+            def resume_proc():
+                if process:
+                    group_id = os.getpgid(process.pid)
+                    os.killpg(group_id, signal.SIGCONT)
+
+            with _signals.suspendable(suspend_proc, resume_proc), _signals.terminator(kill_proc):
+
+                process = self.__multiprocessing_context.Process(target=_background_job_wrapper, args=(queue, target, args))
+
+                with _signals.blocked([signal.SIGINT, signal.SIGTSTP, signal.SIGTERM], ignore=False):
+                    process.start()
+
+                result = queue.get()
+                process.join()
 
             return result
 
-
     def call(self, *popenargs, fail: Optional[str] = None, fail_temporarily: bool = False, **kwargs) -> int:
         """A wrapper for subprocess.call()