You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2020/11/19 19:46:14 UTC

[airflow] 05/06: When sending tasks to celery from a sub-process, reset signal handlers (#11278)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit a63fe94c80cffbe87bd880cc1bbbdb764b317dbd
Author: Ash Berlin-Taylor <as...@firemirror.com>
AuthorDate: Mon Oct 5 14:16:08 2020 +0100

    When sending tasks to celery from a sub-process, reset signal handlers (#11278)
    
    Since these processes are spawned from SchedulerJob after it has
    registered it's signals, if any of them got signaled they would have the
    behaviour of killing the ProcessorAgent process group! (MP has a default
    spawn of fork on Linux, so they inherit all previous state -- signals,
    and access to the `_process.pid` inside the ProcessorAgent instance)
    
    This behaviour is not what we want for these multiprocess.Pool processes.
    
    This _may_ be a source of the long-standing "scheduler is alive but not
    scheduling any jobs. Maybe.
    
    (cherry picked from commit baa980fbc83b25b5cf0700e567e69c2eb156412f)
---
 airflow/executors/celery_executor.py | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py
index 128b25b..35b4e84 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -219,7 +219,14 @@ class CeleryExecutor(BaseExecutor):
             chunksize = self._num_tasks_per_send_process(len(task_tuples_to_send))
             num_processes = min(len(task_tuples_to_send), self._sync_parallelism)
 
-            send_pool = Pool(processes=num_processes)
+            def reset_signals():
+                # Since we are run from inside the SchedulerJob, we don't to
+                # inherit the signal handlers that we registered there.
+                import signal
+                signal.signal(signal.SIGINT, signal.SIG_DFL)
+                signal.signal(signal.SIGTERM, signal.SIG_DFL)
+
+            send_pool = Pool(processes=num_processes, initializer=reset_signals)
             key_and_async_results = send_pool.map(
                 send_task_to_executor,
                 task_tuples_to_send,