You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by GitBox <gi...@apache.org> on 2019/11/26 13:38:30 UTC

[GitHub] [airflow] potiuk commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution.

potiuk commented on a change in pull request #6627: [AIRFLOW-5931] Use os.fork when appropriate to speed up task execution.
URL: https://github.com/apache/airflow/pull/6627#discussion_r350745070
 
 

 ##########
 File path: airflow/utils/helpers.py
 ##########
 @@ -263,71 +263,82 @@ def f(t):
     return s
 
 
-def reap_process_group(pid, log, sig=signal.SIGTERM,
+def reap_process_group(pgid, log, sig=signal.SIGTERM,
                        timeout=DEFAULT_TIME_TO_WAIT_AFTER_SIGTERM):
     """
-    Tries really hard to terminate all children (including grandchildren). Will send
+    Tries really hard to terminate all processes in the group (including grandchildren). Will send
     sig (SIGTERM) to the process group of pid. If any process is alive after timeout
     a SIGKILL will be send.
 
     :param log: log handler
-    :param pid: pid to kill
+    :param pgid: process group id to kill
     :param sig: signal type
     :param timeout: how much time a process has to terminate
     """
 
+    returncodes = {}
+
     def on_terminate(p):
         log.info("Process %s (%s) terminated with exit code %s", p, p.pid, p.returncode)
+        returncodes[p.pid] = p.returncode
 
-    if pid == os.getpid():
-        raise RuntimeError("I refuse to kill myself")
-
-    try:
-        parent = psutil.Process(pid)
-    except psutil.NoSuchProcess:
-        # Race condition - the process already exited
-        return
+    def signal_procs(sig):
+        try:
+            os.killpg(pgid, sig)
+        except OSError as err:
+            if err.errno == errno.ESRCH:
+                return returncodes
+            # If operation not permitted error is thrown due to run_as_user,
+            # use sudo -n(--non-interactive) to kill the process
+            if err.errno == errno.EPERM:
+                subprocess.check_call(
+                    ["sudo", "-n", "kill", "-" + str(sig)] + map(children, lambda p: str(p.pid))
+                )
+            else:
+                raise
 
-    children = parent.children(recursive=True)
-    children.append(parent)
+    if pgid == os.getpgid(0):
+        raise RuntimeError("I refuse to kill myself")
 
 Review comment:
   Love it :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services