You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/18 00:03:09 UTC

[airflow] branch main updated: Graceful scheduler shutdown on error (#18092)

This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a63bf2  Graceful scheduler shutdown on error (#18092)
9a63bf2 is described below

commit 9a63bf2efb7b45ededbe84039d5f3cf6c2cfb853
Author: Nicolas Goll-Perrier <sp...@users.noreply.github.com>
AuthorDate: Sat Sep 18 02:02:49 2021 +0200

    Graceful scheduler shutdown on error (#18092)
    
    closes: https://github.com/apache/airflow/issues/18096
    
    This solves a potential rare occurrence of a process deadlock when using logs serving.
    If the scheduler, for any reason, were to encounter an unrecoverable error (such as a loss of connectivity to the database, or anything forcing the process to exit), the serve_logs subprocess would not be properly terminated.
    
    Thus, the process hangs instead of gracefully shutting down, thus requiring external restart.
    
    By ensuring the `.terminate()` function is _always_ called in case of a failure, the parent process can properly shut itself down.
---
 airflow/cli/commands/celery_command.py       | 18 +++++++++++-------
 airflow/cli/commands/scheduler_command.py    | 24 +++++++++++++-----------
 tests/cli/commands/test_celery_command.py    | 18 ++++++++++++++++++
 tests/cli/commands/test_scheduler_command.py | 17 +++++++++++++++++
 4 files changed, 59 insertions(+), 18 deletions(-)

diff --git a/airflow/cli/commands/celery_command.py b/airflow/cli/commands/celery_command.py
index 451f72c..762fd88 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -88,6 +88,15 @@ def _serve_logs(skip_serve_logs: bool = False) -> Optional[Process]:
     return None
 
 
+def _run_worker(options, skip_serve_logs):
+    sub_proc = _serve_logs(skip_serve_logs)
+    try:
+        celery_app.worker_main(options)
+    finally:
+        if sub_proc:
+            sub_proc.terminate()
+
+
 @cli_utils.action_logging
 def worker(args):
     """Starts Airflow Celery worker"""
@@ -173,15 +182,10 @@ def worker(args):
                 stderr=stderr_handle,
             )
             with ctx:
-                sub_proc = _serve_logs(skip_serve_logs)
-                celery_app.worker_main(options)
+                _run_worker(options=options, skip_serve_logs=skip_serve_logs)
     else:
         # Run Celery worker in the same process
-        sub_proc = _serve_logs(skip_serve_logs)
-        celery_app.worker_main(options)
-
-    if sub_proc:
-        sub_proc.terminate()
+        _run_worker(options=options, skip_serve_logs=skip_serve_logs)
 
 
 @cli_utils.action_logging
diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py
index 44674f0..978d167 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -38,11 +38,20 @@ def _create_scheduler_job(args):
     return job
 
 
+def _run_scheduler_job(args):
+    skip_serve_logs = args.skip_serve_logs
+    job = _create_scheduler_job(args)
+    sub_proc = _serve_logs(skip_serve_logs)
+    try:
+        job.run()
+    finally:
+        if sub_proc:
+            sub_proc.terminate()
+
+
 @cli_utils.action_logging
 def scheduler(args):
     """Starts Airflow Scheduler"""
-    skip_serve_logs = args.skip_serve_logs
-
     print(settings.HEADER)
 
     if args.daemon:
@@ -58,19 +67,12 @@ def scheduler(args):
                 stderr=stderr_handle,
             )
             with ctx:
-                job = _create_scheduler_job(args)
-                sub_proc = _serve_logs(skip_serve_logs)
-                job.run()
+                _run_scheduler_job(args=args)
     else:
-        job = _create_scheduler_job(args)
         signal.signal(signal.SIGINT, sigint_handler)
         signal.signal(signal.SIGTERM, sigint_handler)
         signal.signal(signal.SIGQUIT, sigquit_handler)
-        sub_proc = _serve_logs(skip_serve_logs)
-        job.run()
-
-    if sub_proc:
-        sub_proc.terminate()
+        _run_scheduler_job(args=args)
 
 
 def _serve_logs(skip_serve_logs: bool = False) -> Optional[Process]:
diff --git a/tests/cli/commands/test_celery_command.py b/tests/cli/commands/test_celery_command.py
index 390c9d8..748077e 100644
--- a/tests/cli/commands/test_celery_command.py
+++ b/tests/cli/commands/test_celery_command.py
@@ -232,3 +232,21 @@ class TestWorkerStart(unittest.TestCase):
                 'prefork',
             ]
         )
+
+
+@pytest.mark.backend("mysql", "postgres")
+class TestWorkerFailure(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        cls.parser = cli_parser.get_parser()
+
+    @mock.patch('airflow.cli.commands.celery_command.Process')
+    @mock.patch('airflow.cli.commands.celery_command.celery_app')
+    @conf_vars({("core", "executor"): "CeleryExecutor"})
+    def test_worker_failure_gracefull_shutdown(self, mock_celery_app, mock_popen):
+        args = self.parser.parse_args(['celery', 'worker'])
+        mock_celery_app.run.side_effect = Exception('Mock exception to trigger runtime error')
+        try:
+            celery_command.worker(args)
+        finally:
+            mock_popen().terminate.assert_called()
diff --git a/tests/cli/commands/test_scheduler_command.py b/tests/cli/commands/test_scheduler_command.py
index 59bde70..00b851f 100644
--- a/tests/cli/commands/test_scheduler_command.py
+++ b/tests/cli/commands/test_scheduler_command.py
@@ -70,3 +70,20 @@ class TestSchedulerCommand(unittest.TestCase):
         with conf_vars({("core", "executor"): executor}):
             scheduler_command.scheduler(args)
             mock_process.assert_not_called()
+
+    @parameterized.expand(
+        [
+            ("LocalExecutor",),
+            ("SequentialExecutor",),
+        ]
+    )
+    @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob")
+    @mock.patch("airflow.cli.commands.scheduler_command.Process")
+    def test_graceful_shutdown(self, executor, mock_process, mock_scheduler_job):
+        args = self.parser.parse_args(['scheduler'])
+        with conf_vars({("core", "executor"): executor}):
+            mock_scheduler_job.run.side_effect = Exception('Mock exception to trigger runtime error')
+            try:
+                scheduler_command.scheduler(args)
+            finally:
+                mock_process().terminate.assert_called()