You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by po...@apache.org on 2021/08/02 13:44:08 UTC

[airflow] 13/22: Moves SchedulerJob initialization to within daemon context (#17157)

This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v2-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit ca679720653242d3c656d41fb93d61a5c7c79bb8
Author: Jarek Potiuk <ja...@potiuk.com>
AuthorDate: Sun Aug 1 20:45:01 2021 +0200

    Moves SchedulerJob initialization to within daemon context (#17157)
    
    In Scheduler, the SchedulerJob was instantiated before demon context was
    activated. SchedulerJob is a database ORM object from SQL Alchemy and it
    opens the connection to Postgres:
    
    When you activate daemon context, what happens under the hood is forking
    the process, and while some of the opened sockets were passed to the
    forks (stdin and stderr but also the opened log file handle), the
    established socket for DB connection was not passed.
    
    As the result, when scheduler was started with --daemonize flag
    the error `SSL SYSCALL error: Socket operation on non-socket` was
    raised.
    
    The PR moves SchedulerJob initialization to within the context
    which makes the connection to Postgres initialized after the
    process has been forked and daemonized.
    
    Fixes: #17120
    (cherry picked from commit e8fc3acfd9884312669c1d85b71f42a9aab29cf8)
---
 airflow/cli/commands/scheduler_command.py | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)

diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py
index 368db6f..44674f0 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -29,17 +29,21 @@ from airflow.utils import cli as cli_utils
 from airflow.utils.cli import process_subdir, setup_locations, setup_logging, sigint_handler, sigquit_handler
 
 
+def _create_scheduler_job(args):
+    job = SchedulerJob(
+        subdir=process_subdir(args.subdir),
+        num_runs=args.num_runs,
+        do_pickle=args.do_pickle,
+    )
+    return job
+
+
 @cli_utils.action_logging
 def scheduler(args):
     """Starts Airflow Scheduler"""
     skip_serve_logs = args.skip_serve_logs
 
     print(settings.HEADER)
-    job = SchedulerJob(
-        subdir=process_subdir(args.subdir),
-        num_runs=args.num_runs,
-        do_pickle=args.do_pickle,
-    )
 
     if args.daemon:
         pid, stdout, stderr, log_file = setup_locations(
@@ -54,9 +58,11 @@ def scheduler(args):
                 stderr=stderr_handle,
             )
             with ctx:
+                job = _create_scheduler_job(args)
                 sub_proc = _serve_logs(skip_serve_logs)
                 job.run()
     else:
+        job = _create_scheduler_job(args)
         signal.signal(signal.SIGINT, sigint_handler)
         signal.signal(signal.SIGTERM, sigint_handler)
         signal.signal(signal.SIGQUIT, sigquit_handler)