You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/09/18 00:03:09 UTC
[airflow] branch main updated: Graceful scheduler shutdown on error
(#18092)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 9a63bf2 Graceful scheduler shutdown on error (#18092)
9a63bf2 is described below
commit 9a63bf2efb7b45ededbe84039d5f3cf6c2cfb853
Author: Nicolas Goll-Perrier <sp...@users.noreply.github.com>
AuthorDate: Sat Sep 18 02:02:49 2021 +0200
Graceful scheduler shutdown on error (#18092)
closes: https://github.com/apache/airflow/issues/18096
This solves a potential rare occurrence of a process deadlock when using logs serving.
If the scheduler, for any reason, were to encounter an unrecoverable error (such as a loss of connectivity to the database, or anything forcing the process to exit), the serve_logs subprocess would not be properly terminated.
Thus, the process hangs instead of gracefully shutting down, thus requiring external restart.
By ensuring the `.terminate()` function is _always_ called in case of a failure, the parent process can properly shut itself down.
---
airflow/cli/commands/celery_command.py | 18 +++++++++++-------
airflow/cli/commands/scheduler_command.py | 24 +++++++++++++-----------
tests/cli/commands/test_celery_command.py | 18 ++++++++++++++++++
tests/cli/commands/test_scheduler_command.py | 17 +++++++++++++++++
4 files changed, 59 insertions(+), 18 deletions(-)
diff --git a/airflow/cli/commands/celery_command.py b/airflow/cli/commands/celery_command.py
index 451f72c..762fd88 100644
--- a/airflow/cli/commands/celery_command.py
+++ b/airflow/cli/commands/celery_command.py
@@ -88,6 +88,15 @@ def _serve_logs(skip_serve_logs: bool = False) -> Optional[Process]:
return None
+def _run_worker(options, skip_serve_logs):
+ sub_proc = _serve_logs(skip_serve_logs)
+ try:
+ celery_app.worker_main(options)
+ finally:
+ if sub_proc:
+ sub_proc.terminate()
+
+
@cli_utils.action_logging
def worker(args):
"""Starts Airflow Celery worker"""
@@ -173,15 +182,10 @@ def worker(args):
stderr=stderr_handle,
)
with ctx:
- sub_proc = _serve_logs(skip_serve_logs)
- celery_app.worker_main(options)
+ _run_worker(options=options, skip_serve_logs=skip_serve_logs)
else:
# Run Celery worker in the same process
- sub_proc = _serve_logs(skip_serve_logs)
- celery_app.worker_main(options)
-
- if sub_proc:
- sub_proc.terminate()
+ _run_worker(options=options, skip_serve_logs=skip_serve_logs)
@cli_utils.action_logging
diff --git a/airflow/cli/commands/scheduler_command.py b/airflow/cli/commands/scheduler_command.py
index 44674f0..978d167 100644
--- a/airflow/cli/commands/scheduler_command.py
+++ b/airflow/cli/commands/scheduler_command.py
@@ -38,11 +38,20 @@ def _create_scheduler_job(args):
return job
+def _run_scheduler_job(args):
+ skip_serve_logs = args.skip_serve_logs
+ job = _create_scheduler_job(args)
+ sub_proc = _serve_logs(skip_serve_logs)
+ try:
+ job.run()
+ finally:
+ if sub_proc:
+ sub_proc.terminate()
+
+
@cli_utils.action_logging
def scheduler(args):
"""Starts Airflow Scheduler"""
- skip_serve_logs = args.skip_serve_logs
-
print(settings.HEADER)
if args.daemon:
@@ -58,19 +67,12 @@ def scheduler(args):
stderr=stderr_handle,
)
with ctx:
- job = _create_scheduler_job(args)
- sub_proc = _serve_logs(skip_serve_logs)
- job.run()
+ _run_scheduler_job(args=args)
else:
- job = _create_scheduler_job(args)
signal.signal(signal.SIGINT, sigint_handler)
signal.signal(signal.SIGTERM, sigint_handler)
signal.signal(signal.SIGQUIT, sigquit_handler)
- sub_proc = _serve_logs(skip_serve_logs)
- job.run()
-
- if sub_proc:
- sub_proc.terminate()
+ _run_scheduler_job(args=args)
def _serve_logs(skip_serve_logs: bool = False) -> Optional[Process]:
diff --git a/tests/cli/commands/test_celery_command.py b/tests/cli/commands/test_celery_command.py
index 390c9d8..748077e 100644
--- a/tests/cli/commands/test_celery_command.py
+++ b/tests/cli/commands/test_celery_command.py
@@ -232,3 +232,21 @@ class TestWorkerStart(unittest.TestCase):
'prefork',
]
)
+
+
+@pytest.mark.backend("mysql", "postgres")
+class TestWorkerFailure(unittest.TestCase):
+ @classmethod
+ def setUpClass(cls):
+ cls.parser = cli_parser.get_parser()
+
+ @mock.patch('airflow.cli.commands.celery_command.Process')
+ @mock.patch('airflow.cli.commands.celery_command.celery_app')
+ @conf_vars({("core", "executor"): "CeleryExecutor"})
+ def test_worker_failure_gracefull_shutdown(self, mock_celery_app, mock_popen):
+ args = self.parser.parse_args(['celery', 'worker'])
+ mock_celery_app.run.side_effect = Exception('Mock exception to trigger runtime error')
+ try:
+ celery_command.worker(args)
+ finally:
+ mock_popen().terminate.assert_called()
diff --git a/tests/cli/commands/test_scheduler_command.py b/tests/cli/commands/test_scheduler_command.py
index 59bde70..00b851f 100644
--- a/tests/cli/commands/test_scheduler_command.py
+++ b/tests/cli/commands/test_scheduler_command.py
@@ -70,3 +70,20 @@ class TestSchedulerCommand(unittest.TestCase):
with conf_vars({("core", "executor"): executor}):
scheduler_command.scheduler(args)
mock_process.assert_not_called()
+
+ @parameterized.expand(
+ [
+ ("LocalExecutor",),
+ ("SequentialExecutor",),
+ ]
+ )
+ @mock.patch("airflow.cli.commands.scheduler_command.SchedulerJob")
+ @mock.patch("airflow.cli.commands.scheduler_command.Process")
+ def test_graceful_shutdown(self, executor, mock_process, mock_scheduler_job):
+ args = self.parser.parse_args(['scheduler'])
+ with conf_vars({("core", "executor"): executor}):
+ mock_scheduler_job.run.side_effect = Exception('Mock exception to trigger runtime error')
+ try:
+ scheduler_command.scheduler(args)
+ finally:
+ mock_process().terminate.assert_called()