You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airflow.apache.org by ka...@apache.org on 2021/01/19 22:23:56 UTC
[airflow] branch master updated: Fix webserver exiting when
gunicorn master crashes (#13518)
This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push:
new 8a4bd3c Fix webserver exiting when gunicorn master crashes (#13518)
8a4bd3c is described below
commit 8a4bd3c73eb46c5ddd9312c71772a423fb1fd688
Author: drago-f5a <64...@users.noreply.github.com>
AuthorDate: Tue Jan 19 16:23:40 2021 -0600
Fix webserver exiting when gunicorn master crashes (#13518)
* Correct the logic for webserver choosing number of workers to spawn (#13469)
A key consequence of this fix is that webserver will properly
exit when gunicorn master dies and stops responding to signals.
---
airflow/cli/commands/webserver_command.py | 4 ++--
tests/cli/commands/test_webserver_command.py | 21 +++++++++++++++++----
2 files changed, 19 insertions(+), 6 deletions(-)
diff --git a/airflow/cli/commands/webserver_command.py b/airflow/cli/commands/webserver_command.py
index 8dd7300..6831553 100644
--- a/airflow/cli/commands/webserver_command.py
+++ b/airflow/cli/commands/webserver_command.py
@@ -258,7 +258,7 @@ class GunicornMonitor(LoggingMixin):
num_workers_running = self._get_num_workers_running()
if num_workers_running < self.num_workers_expected:
new_worker_count = min(
- num_workers_running - self.worker_refresh_batch_size, self.worker_refresh_batch_size
+ self.num_workers_expected - num_workers_running, self.worker_refresh_batch_size
)
self.log.debug(
'[%d / %d] Spawning %d workers',
@@ -266,7 +266,7 @@ class GunicornMonitor(LoggingMixin):
num_workers_running,
new_worker_count,
)
- self._spawn_new_workers(num_workers_running)
+ self._spawn_new_workers(new_worker_count)
return
# Now the number of running and expected worker should be equal
diff --git a/tests/cli/commands/test_webserver_command.py b/tests/cli/commands/test_webserver_command.py
index a73bead..5e01b13 100644
--- a/tests/cli/commands/test_webserver_command.py
+++ b/tests/cli/commands/test_webserver_command.py
@@ -71,9 +71,20 @@ class TestGunicornMonitor(unittest.TestCase):
@mock.patch('airflow.cli.commands.webserver_command.sleep')
def test_should_start_new_workers_when_missing(self, mock_sleep):
- self.monitor._get_num_ready_workers_running.return_value = 2
- self.monitor._get_num_workers_running.return_value = 2
+ self.monitor._get_num_ready_workers_running.return_value = 3
+ self.monitor._get_num_workers_running.return_value = 3
self.monitor._check_workers()
+ # missing one worker, starting just 1
+ self.monitor._spawn_new_workers.assert_called_once_with(1) # pylint: disable=no-member
+ self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member
+ self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member
+
+ @mock.patch('airflow.cli.commands.webserver_command.sleep')
+ def test_should_start_new_batch_when_missing_many_workers(self, mock_sleep):
+ self.monitor._get_num_ready_workers_running.return_value = 1
+ self.monitor._get_num_workers_running.return_value = 1
+ self.monitor._check_workers()
+ # missing 3 workers, but starting single batch (2)
self.monitor._spawn_new_workers.assert_called_once_with(2) # pylint: disable=no-member
self.monitor._kill_old_workers.assert_not_called() # pylint: disable=no-member
self.monitor._reload_gunicorn.assert_not_called() # pylint: disable=no-member
@@ -230,14 +241,16 @@ class TestCliWebServer(unittest.TestCase):
def _check_processes(self, ignore_running=False):
# Confirm that webserver hasn't been launched.
# pgrep returns exit status 1 if no process matched.
+ # Use more specific regexps (^) to avoid matching pytest run when running specific method.
+ # For instance, we want to be able to do: pytest -k 'gunicorn'
exit_code_pgrep_webserver = subprocess.Popen(["pgrep", "-c", "-f", "airflow webserver"]).wait()
- exit_code_pgrep_gunicorn = subprocess.Popen(["pgrep", "-c", "-f", "gunicorn"]).wait()
+ exit_code_pgrep_gunicorn = subprocess.Popen(["pgrep", "-c", "-f", "^gunicorn"]).wait()
if exit_code_pgrep_webserver != 1 or exit_code_pgrep_gunicorn != 1:
subprocess.Popen(["ps", "-ax"]).wait()
if exit_code_pgrep_webserver != 1:
subprocess.Popen(["pkill", "-9", "-f", "airflow webserver"]).wait()
if exit_code_pgrep_gunicorn != 1:
- subprocess.Popen(["pkill", "-9", "-f", "gunicorn"]).wait()
+ subprocess.Popen(["pkill", "-9", "-f", "^gunicorn"]).wait()
if not ignore_running:
raise AssertionError(
"Background processes are running that prevent the test from passing successfully."