You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2022/06/12 21:00:35 UTC
[airflow] branch main updated: misc: create new process group by `set_new_process_group` utility (#24371)
This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new dd78e29a8c misc: create new process group by `set_new_process_group` utility (#24371)
dd78e29a8c is described below
commit dd78e29a8c858769c9c21752f319e19af7f64377
Author: Andrey Anshin <An...@taragol.is>
AuthorDate: Mon Jun 13 01:00:26 2022 +0400
misc: create new process group by `set_new_process_group` utility (#24371)
---
airflow/dag_processing/manager.py | 2 +-
airflow/task/task_runner/standard_task_runner.py | 4 ++--
airflow/utils/process_utils.py | 5 +++--
3 files changed, 6 insertions(+), 5 deletions(-)
diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py
index 38f5ffdbbf..3e18a42b5d 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -217,7 +217,7 @@ class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
# Make this process start as a new process group - that makes it easy
# to kill all sub-process of this at the OS-level, rather than having
# to iterate the child processes
- os.setpgid(0, 0)
+ set_new_process_group()
setproctitle("airflow scheduler -- DagFileProcessorManager")
# Reload configurations and settings to avoid collision with parent process.
diff --git a/airflow/task/task_runner/standard_task_runner.py b/airflow/task/task_runner/standard_task_runner.py
index 53f873ec1f..7aaf05fba3 100644
--- a/airflow/task/task_runner/standard_task_runner.py
+++ b/airflow/task/task_runner/standard_task_runner.py
@@ -25,7 +25,7 @@ from setproctitle import setproctitle
from airflow.settings import CAN_FORK
from airflow.task.task_runner.base_task_runner import BaseTaskRunner
-from airflow.utils.process_utils import reap_process_group
+from airflow.utils.process_utils import reap_process_group, set_new_process_group
class StandardTaskRunner(BaseTaskRunner):
@@ -53,7 +53,7 @@ class StandardTaskRunner(BaseTaskRunner):
return psutil.Process(pid)
else:
# Start a new process group
- os.setpgid(0, 0)
+ set_new_process_group()
import signal
signal.signal(signal.SIGINT, signal.SIG_DFL)
diff --git a/airflow/utils/process_utils.py b/airflow/utils/process_utils.py
index 4ae1f43980..68d74ebe1e 100644
--- a/airflow/utils/process_utils.py
+++ b/airflow/utils/process_utils.py
@@ -61,8 +61,9 @@ def reap_process_group(
a SIGKILL will be send.
:param process_group_id: process group id to kill.
- The process that wants to create the group should run `os.setpgid(0, 0)` as the first
- command it executes which will set group id = process_id. Effectively the process that is the
+ The process that wants to create the group should run
+ `airflow.utils.process_utils.set_new_process_group()` as the first command
+ it executes which will set group id = process_id. Effectively the process that is the
"root" of the group has pid = gid and all other processes in the group have different
pids but the same gid (equal the pid of the root process)
:param logger: log handler