You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/12 21:00:35 UTC

[airflow] branch main updated: misc: create new process group by `set_new_process_group` utility (#24371)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new dd78e29a8c misc: create new process group by `set_new_process_group` utility (#24371)
dd78e29a8c is described below

commit dd78e29a8c858769c9c21752f319e19af7f64377
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Mon Jun 13 01:00:26 2022 +0400

    misc: create new process group by `set_new_process_group` utility (#24371)
---
 airflow/dag_processing/manager.py                | 2 +-
 airflow/task/task_runner/standard_task_runner.py | 4 ++--
 airflow/utils/process_utils.py                   | 5 +++--
 3 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 38f5ffdbbf..3e18a42b5d 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -217,7 +217,7 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
         # Make this process start as a new process group - that makes it easy
         # to kill all sub-process of this at the OS-level, rather than having
         # to iterate the child processes
-        os.setpgid(0, 0)
+        set_new_process_group()
 
         setproctitle("airflow scheduler -- DagFileProcessorManager")
         # Reload configurations and settings to avoid collision with parent process.
diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index 53f873ec1f..7aaf05fba3 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -25,7 +25,7 @@ from setproctitle import setproctitle
 
 from airflow.settings import CAN_FORK
 from airflow.task.task_runner.base_task_runner import BaseTaskRunner
-from airflow.utils.process_utils import reap_process_group
+from airflow.utils.process_utils import reap_process_group, set_new_process_group
 
 
 class StandardTaskRunner(BaseTaskRunner):
@@ -53,7 +53,7 @@ class StandardTaskRunner(BaseTaskRunner):
             return psutil.Process(pid)
         else:
             # Start a new process group
-            os.setpgid(0, 0)
+            set_new_process_group()
             import signal
 
             signal.signal(signal.SIGINT, signal.SIG_DFL)
diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py
index 4ae1f43980..68d74ebe1e 100644
--- a/airflow/utils/process_utils.py
+++ b/airflow/utils/process_utils.py
@@ -61,8 +61,9 @@ def reap_process_group(
     a SIGKILL will be send.
 
     :param process_group_id: process group id to kill.
-           The process that wants to create the group should run `os.setpgid(0, 0)` as the first
-           command it executes which will set group id = process_id. Effectively the process that is the
+           The process that wants to create the group should run
+           `airflow.utils.process_utils.set_new_process_group()` as the first command
+           it executes which will set group id = process_id. Effectively the process that is the
            "root" of the group has pid = gid and all other processes in the group have different
            pids but the same gid (equal the pid of the root process)
     :param logger: log handler