You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ep...@apache.org on 2023/10/30 11:26:15 UTC

(airflow) 20/46: Fix triggerer thread crash in daemon mode (#34931)

This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v2-7-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 7559c1cabed4fc3c7a7d5aa67ac1461915a1ead2
Author: Daniel DylÄ…g <Bi...@users.noreply.github.com>
AuthorDate: Sat Oct 14 17:56:24 2023 +0200

    Fix triggerer thread crash in daemon mode (#34931)
    
    * Fixes #34816
    
    Change the order of operations so that async child thread is created after forking when entering daemon context.
    
    This makes sure that the thread stays alive in the internal loop.
    
    ---------
    
    Co-authored-by: daniel.dylag <da...@gmail.com>
    (cherry picked from commit 9c1e8c28307cc808739a3535e0d7901d0699dcf4)
---
 airflow/cli/commands/triggerer_command.py | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/airflow/cli/commands/triggerer_command.py b/airflow/cli/commands/triggerer_command.py
index 2288f1537f..5ddb4e23b6 100644
--- a/airflow/cli/commands/triggerer_command.py
+++ b/airflow/cli/commands/triggerer_command.py
@@ -58,7 +58,6 @@ def triggerer(args):
     settings.MASK_SECRETS_IN_LOGS = True
     print(settings.HEADER)
     triggerer_heartrate = conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC")
-    triggerer_job_runner = TriggererJobRunner(job=Job(heartrate=triggerer_heartrate), capacity=args.capacity)
 
     if args.daemon:
         pid, stdout, stderr, log_file = setup_locations(
@@ -77,10 +76,16 @@ def triggerer(args):
                 umask=int(settings.DAEMON_UMASK, 8),
             )
             with daemon_context, _serve_logs(args.skip_serve_logs):
+                triggerer_job_runner = TriggererJobRunner(
+                    job=Job(heartrate=triggerer_heartrate), capacity=args.capacity
+                )
                 run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)
     else:
         signal.signal(signal.SIGINT, sigint_handler)
         signal.signal(signal.SIGTERM, sigint_handler)
         signal.signal(signal.SIGQUIT, sigquit_handler)
         with _serve_logs(args.skip_serve_logs):
+            triggerer_job_runner = TriggererJobRunner(
+                job=Job(heartrate=triggerer_heartrate), capacity=args.capacity
+            )
             run_job(job=triggerer_job_runner.job, execute_callable=triggerer_job_runner._execute)